In [7]:
import os
import requests

api_url = 'https://api.github.com/repos/erkansirin78/datasets/contents/nyc_taxi_yellow_trip_raw'

response = requests.get(api_url)

if response.status_code == 200:
    files = response.json()
    
    output_directory = 'nyc_taxi_parquet_files'
    os.makedirs(output_directory, exist_ok=True)

    for file_info in files:
        if file_info['name'].endswith('.parquet'):
            file_url = file_info['download_url']
            parquet_file_name = file_info['name']
            
            file_response = requests.get(file_url)
            if file_response.status_code == 200:
                with open(os.path.join(output_directory, parquet_file_name), 'wb') as f:
                    f.write(file_response.content)
                print(f"Downloaded: {parquet_file_name}")
            else:
                print(f"Failed to download: {parquet_file_name} (status code: {file_response.status_code})")
else:
    print(f"Failed to retrieve file list (status code: {response.status_code})")

Downloaded: yellow_tripdata_2022-01.parquet
Downloaded: yellow_tripdata_2022-02.parquet
Downloaded: yellow_tripdata_2022-03.parquet
Downloaded: yellow_tripdata_2022-04.parquet
Downloaded: yellow_tripdata_2022-05.parquet
Downloaded: yellow_tripdata_2022-06.parquet
Downloaded: yellow_tripdata_2022-07.parquet
Downloaded: yellow_tripdata_2022-08.parquet
Downloaded: yellow_tripdata_2022-09.parquet
Downloaded: yellow_tripdata_2022-10.parquet
Downloaded: yellow_tripdata_2022-11.parquet
Downloaded: yellow_tripdata_2022-12.parquet
Downloaded: yellow_tripdata_2023-01.parquet
Downloaded: yellow_tripdata_2023-02.parquet
Downloaded: yellow_tripdata_2023-03.parquet
Downloaded: yellow_tripdata_2023-04.parquet
Downloaded: yellow_tripdata_2023-05.parquet
Downloaded: yellow_tripdata_2023-06.parquet
Downloaded: yellow_tripdata_2023-07.parquet
Downloaded: yellow_tripdata_2023-08.parquet
Downloaded: yellow_tripdata_2023-09.parquet
Downloaded: yellow_tripdata_2023-10.parquet
Downloaded: yellow_tripdata_2023

In [19]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import configparser
from pyspark.sql import functions as F

In [10]:
accessKeyId='dataops'
secretAccessKey='Ankara06'

spark = SparkSession.builder \
.appName("Week8") \
.master("local[2]") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
.config("fs.s3a.access.key", accessKeyId) \
.config("fs.s3a.secret.key", secretAccessKey) \
.config("fs.s3a.path.style.access", True) \
.config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("fs.s3a.endpoint", "http://minio:9000") \
.getOrCreate()

24/10/31 14:02:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [41]:
# parquet_directory = "file:///opt/examples/nyc_taxi_parquet_files"
# df = spark.read.parquet(parquet_directory)

In [55]:
from pyspark.sql.types import StructType, StructField, LongType, TimestampType, DoubleType, StringType, DateType

schema = StructType([
    StructField("VendorID", LongType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", DoubleType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("RatecodeID", DoubleType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("PULocationID", LongType(), True),
    StructField("DOLocationID", LongType(), True),
    StructField("payment_type", LongType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True),
    StructField("Airport_fee", DoubleType(), True),
    StructField("DropOffDay", DateType(), True)
])

In [54]:
parquet_directory = "file:///opt/examples/nyc_taxi_parquet_files"
converted_directory = "/opt/examples/nyc_taxi_parquet_files_converted"

for file_name in os.listdir(parquet_directory.replace("file://", "")):
    file_path = f"{parquet_directory}/{file_name}"
    df = spark.read.parquet(file_path)
    
    for field in schema.fields:
        if field.name in df.columns:
            df = df.withColumn(field.name, df[field.name].cast(field.dataType))
    df.write.mode("overwrite").parquet(f"{converted_directory}/{file_name}")


                                                                                

In [59]:
sample_file = f"file://{converted_directory}/yellow_tripdata_2024-05.parquet" 
sample_df = spark.read.parquet(sample_file)
sample_df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2024-05-01 00:59:15|  2024-05-01 01:23:50|            1.0|          6.1|       1.0|                 N|         138|         145|           1|       28.2| 7.75|    0.5|       5.

In [65]:
df = spark.read.option("mergeSchema", "true") \
                        .option("recursiveFileLookup", "true") \
                        .parquet(f"file://{converted_directory}")

df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-10-01 00:03:41|  2022-10-01 00:18:39|            1.0|          1.7|       1.0|                 N|         249|         107|           1|        9.5|  3.0|    0.5|      2.6

In [68]:
df = df.withColumn("day", F.to_date(F.col("tpep_dropoff_datetime")))

In [69]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|DropOffDay|       day|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+----------+
|       1| 2022-10-01 00:03:41|  2022-10-01 00:18:39|            1.0|          1.7|       1.0|                 N|        

In [70]:
minio_path = "s3a://week8hw/nyc_yellow_pby_day_parquet"
minio_bucket = "s3a://week8hw"
partition_column = "day"

df.write \
    .partitionBy(partition_column) \
    .format("parquet") \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .save(minio_path)


24/10/31 14:52:52 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                