In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, year, to_date
from pyspark.sql.types import *
import pandas as pd
from io import StringIO
import os
import requests

os.environ['PYSPARK_PYTHON'] = '/opt/jupyterhub/miniconda/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/jupyterhub/miniconda/bin/python'

# Создаем SparkSession с поддержкой HDFS
spark = SparkSession.builder \
    .appName("NYC Taxi Data to HDFS") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://master:9000") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .config("spark.executor.memory", "1g") \
    .config("spark.driver.memory", "1g") \
    .config("spark.network.timeout", "600s") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

# Проверка подключения
print("SparkSession успешно создан!")
print("Версия Spark:", spark.version)

In [None]:
def download_and_load(target_year, url, max_rows=10000):
    print(f"Скачиваем данные за {target_year} год...")
    
    # Скачиваем данные
    response = requests.get(url, stream=True)
    response.raise_for_status()
    
    # Читаем только нужное количество строк
    lines = []
    line_count = 0
    for line in response.iter_lines():
        if max_rows is None or line_count <= max_rows:
            lines.append(line.decode('utf-8'))
            line_count += 1
        else:
            break
    
    # Создаем pandas DataFrame из строк
    pdf = pd.read_csv(StringIO('\n'.join(lines)))
    
    pdf["tpep_pickup_datetime"] = pdf["tpep_pickup_datetime"].astype(str)
    pdf["tpep_dropoff_datetime"] = pdf["tpep_dropoff_datetime"].astype(str)
    pdf["store_and_fwd_flag"] = pdf["store_and_fwd_flag"].astype(str)
    
    # Конвертируем в Spark DataFrame
    df = spark.createDataFrame(pdf)
    
    # Парсим дату с учетом AM/PM
    df = df.withColumn(
        "tpep_pickup_datetime", 
        to_timestamp(col("tpep_pickup_datetime"), "MM/dd/yyyy h:mm:ss a")
    ).withColumn(
        "tpep_dropoff_datetime", 
        to_timestamp(col("tpep_dropoff_datetime"), "MM/dd/yyyy h:mm:ss a")
    )
    
    # Фильтруем по году
    result_df = df.filter(year(col("tpep_pickup_datetime")) == target_year)
    
    return result_df

# Загружаем данные
taxi_2019 = download_and_load(2019, "https://data.cityofnewyork.us/api/views/2upf-qytp/rows.csv?accessType=DOWNLOAD")
taxi_2020 = download_and_load(2020, "https://data.cityofnewyork.us/api/views/kxp8-n2sj/rows.csv?accessType=DOWNLOAD")
taxi_2021 = download_and_load(2021, "https://data.cityofnewyork.us/api/views/m6nq-qud6/rows.csv?accessType=DOWNLOAD")

# Проверяем количество строк
print(f"2019: {taxi_2019.count()} записей")
print(f"2020: {taxi_2020.count()} записей")
print(f"2021: {taxi_2021.count()} записей")

In [None]:
# Определяем схему для приведения типов
taxi_schema = StructType([
    StructField("VendorID", IntegerType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("RatecodeID", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True)
])

# Функция для очистки данных
def clean_taxi_data(df):
    # Приводим типы данных
    df = spark.createDataFrame(df.rdd, taxi_schema)
    
    # Обрабатываем пропущенные значения
    df = df.fillna({
        "passenger_count": 1,
        "trip_distance": 0.0,
        "RatecodeID": 1,
        "payment_type": 1,
        "fare_amount": 0.0,
        "extra": 0.0,
        "mta_tax": 0.0,
        "tip_amount": 0.0,
        "tolls_amount": 0.0,
        "improvement_surcharge": 0.0,
        "total_amount": 0.0,
        "congestion_surcharge": 0.0
    })
    
    # Добавляем столбец с датой для партиционирования
    df = df.withColumn("pickup_date", to_date(col("tpep_pickup_datetime")))
    
    return df

# Очищаем данные для каждого года
taxi_2019_clean = clean_taxi_data(taxi_2019)
taxi_2020_clean = clean_taxi_data(taxi_2020)
taxi_2021_clean = clean_taxi_data(taxi_2021)

# Проверяем схему после очистки
taxi_2020_clean.printSchema()

In [None]:
# Функция для сохранения данных
def save_to_hdfs(df, year):
    output_path = f"hdfs://master:9000/user/alice/data/yellow_taxi/{year}"
    df.write \
        .partitionBy("pickup_date") \
        .mode("overwrite") \
        .parquet(output_path)
    print(f"Данные за {year} год сохранены в {output_path}")

# Сохраняем данные за каждый год
save_to_hdfs(taxi_2019_clean, "2019")
save_to_hdfs(taxi_2020_clean, "2020")
save_to_hdfs(taxi_2021_clean, "2021")

In [None]:
# Функция для проверки сохраненных данных
def check_hdfs_data(year):
    path = f"hdfs://master:9000/user/alice/data/yellow_taxi/{year}"
    df = spark.read.parquet(path)
    print(f"Проверка данных за {year} год:")
    print(f"Количество записей: {df.count()}")
    print(f"Даты партиций: {df.select('pickup_date').distinct().count()} уникальных дней")
    df.show(5)

check_hdfs_data("2019")
check_hdfs_data("2020")
check_hdfs_data("2021")

In [None]:
spark.stop()