In [4]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
!pip install -q pyspark

In [3]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ["PATH"] += ":/content/spark-3.5.0-bin-hadoop3/bin"


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("NYC Taxi Analysis") \
    .config("spark.ui.showConsoleProgress", "false") \
    .getOrCreate()

spark


In [2]:
df = spark.read.csv(
    "/content/drive/MyDrive/Colab/nyc_yellow_taxi_trip_records_from_Jan_to_Aug_2023.csv",
    header=True,
    inferSchema=True
)

df.printSchema()
df.show(5)


root
 |-- _c0: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee19: double (nullable = true)
 |-- airport_fee20: double (nullable = true)

+---+--------+--------------------+--------

In [6]:
from pyspark.sql.functions import hour, avg, sum, count

df.createOrReplaceTempView("trips")

spark.sql("""
SELECT
    hour(tpep_pickup_datetime) AS hour,
    COUNT(*) AS trips_count,
    ROUND(AVG(trip_distance), 2) AS avg_distance,
    ROUND(SUM(total_amount), 2) AS total_revenue
FROM trips
GROUP BY hour
ORDER BY total_revenue DESC
""").show(24)


+----+-----------+------------+-------------+
|hour|trips_count|avg_distance|total_revenue|
+----+-----------+------------+-------------+
|  17|    1316848|        4.08|3.852870336E7|
|  18|    1385190|        3.71|3.808753236E7|
|  16|    1209878|        3.85|3.720181526E7|
|  15|    1204698|         4.2|3.465547104E7|
|  14|    1183703|        4.04|3.434853213E7|
|  19|    1232699|        3.73|3.397206557E7|
|  13|    1100656|        3.86|3.113179763E7|
|  21|    1089731|        3.84|3.006295055E7|
|  20|    1089798|        3.55|2.969124112E7|
|  22|    1014446|        4.18|2.898313068E7|
|  12|    1064579|        3.54|2.877389607E7|
|  11|     982335|        3.33|2.604133068E7|
|  23|     801988|        4.32|2.400605817E7|
|  10|     904466|        3.32| 2.37936807E7|
|   9|     835130|        4.07|2.186506263E7|
|   8|     741035|        4.34|1.931752442E7|
|   0|     546260|         4.1|1.565619992E7|
|   7|     536290|        6.15|1.460285599E7|
|   1|     364547|        3.82|   

In [7]:
from pyspark.sql.functions import hour, avg, sum, count

spark_result = df.withColumn(
    "hour", hour("tpep_pickup_datetime")
).groupBy("hour").agg(
    count("*").alias("trips_count"),
    avg("trip_distance").alias("avg_distance"),
    sum("total_amount").alias("total_revenue")
).orderBy("total_revenue", ascending=False)

spark_result.show(24)


+----+-----------+------------------+--------------------+
|hour|trips_count|      avg_distance|       total_revenue|
+----+-----------+------------------+--------------------+
|  17|    1316848| 4.078425930707257|3.8528703360000156E7|
|  18|    1385190| 3.710669489384137| 3.808753236000049E7|
|  16|    1209878| 3.851717445891244|3.7201815260000326E7|
|  15|    1204698| 4.202593695681405|3.4655471040000595E7|
|  14|    1183703|4.0401657594852916| 3.434853213000064E7|
|  19|    1232699| 3.731510247027062|3.3972065570000984E7|
|  13|    1100656| 3.861419353549155|3.1131797630000565E7|
|  21|    1089731|3.8418781056976434|3.0062950549999878E7|
|  20|    1089798| 3.549379894255636|2.9691241119999923E7|
|  22|    1014446| 4.182336506822446|2.8983130679999884E7|
|  12|    1064579| 3.543485283854002| 2.877389607000057E7|
|  11|     982335| 3.329002438068473| 2.604133068000036E7|
|  23|     801988| 4.321283049122934|2.4006058170000102E7|
|  10|     904466|3.3190092828254527|2.3793680700000335E

| Подход    | Описание                          |
| --------- | --------------------------------- |
| Spark SQL | Аналог Hive (декларативный стиль) |
| Spark API | Программный, быстрее и гибче      |



Логика DAG
[Загрузка данных]
        ->
[Hadoop-style SQL]
        ->
[Spark API]
        ->
[Экспорт результата]


In [8]:
spark_result.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("/content/drive/MyDrive/bigdata/output/hourly_revenue")


In [11]:
import pandas as pd
import glob

# Берём сгенерированный Spark CSV
path = "/content/drive/MyDrive/bigdata/output/hourly_revenue"
csv_file = glob.glob(f"{path}/*.csv")[0]

# Загружаем в pandas (для пользователя)
datamart_df = pd.read_csv(csv_file)

# Сортировка и округление
datamart_df = datamart_df.sort_values("hour")
datamart_df["avg_distance"] = datamart_df["avg_distance"].round(2)
datamart_df["total_revenue"] = datamart_df["total_revenue"].round(2)

# Сохраняем витрину данных
datamart_path = "/content/drive/MyDrive/bigdata/datamart/hourly_revenue_datamart.csv"
datamart_df.to_csv(datamart_path, index=False)

datamart_df.head(24)


Unnamed: 0,hour,trips_count,avg_distance,total_revenue
16,0,546260,4.1,15656199.92
18,1,364547,3.82,9615453.18
20,2,240078,3.81,5960529.53
22,3,158619,3.64,4120743.25
23,4,106089,9.59,3479880.51
21,5,112434,14.29,4254903.96
19,6,271562,8.11,8573215.07
17,7,536290,6.15,14602855.99
15,8,741035,4.34,19317524.42
14,9,835130,4.07,21865062.63


In [None]:
# airflow_dag.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "student",
    "depends_on_past": False,
    "start_date": datetime(2024, 1, 1),
    "retries": 1,
    "retry_delay": timedelta(minutes=5)
}

with DAG(
    dag_id="nyc_taxi_bigdata_pipeline",
    description="Automated Spark pipeline for NYC Taxi data",
    default_args=default_args,
    schedule_interval="@daily",
    catchup=False,
    tags=["spark", "bigdata", "education"]
) as dag:

    # 1. Загрузка данных
    load_data = BashOperator(
        task_id="load_data",
        bash_command="""
        echo "Loading NYC Taxi data from Google Drive (Data Lake)"
        """
    )

    # 2. Hadoop-style анализ (Spark SQL)
    hadoop_style = BashOperator(
        task_id="hadoop_style_analysis",
        bash_command="""
        echo "Running Spark SQL (Hive / MapReduce analogue)"
        """
    )

    # 3. Spark DataFrame API
    spark_api = BashOperator(
        task_id="spark_api_analysis",
        bash_command="""
        echo "Running Spark DataFrame API aggregation"
        """
    )

    # 4. Формирование витрины данных
    def build_datamart():
        print("Building user-friendly data mart (CSV for BI / analytics)")

    datamart = PythonOperator(
        task_id="build_datamart",
        python_callable=build_datamart
    )

    load_data >> hadoop_style >> spark_api >> datamart
