In [14]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


In [12]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/03 20:49:53 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [22]:
spark

In [23]:
# schema generate
# csv_file = "/home/vladimir/data-engineering-zoomcamp/05-batch/code/data/raw/fhv/2019/01/fhv_tripdata_2019_01.csv.gz"

# pandas_df = pd.read_csv(csv_file)

# spark_schema = StructType([
#     StructField(name, StringType() if dtype == "object" else IntegerType(), True)
#     for name, dtype in pandas_df.dtypes.items()  # Вместо iteritems() используем items()
# ])

In [24]:
fhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.IntegerType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [25]:
year = 2019

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'/home/vladimir/data-engineering-zoomcamp/05-batch/code/data/raw/fhv/{year}/{month:02d}/'
    output_path = f'/home/vladimir/data-engineering-zoomcamp/05-batch/code/data/pq/{year}/{month:02d}/'

    df_fhv = spark.read \
        .option("header", "true") \
        .schema(fhv_schema) \
        .csv(input_path)

    df_fhv \
        .repartition(6) \
        .write \
        .mode("overwrite") \
        .parquet(output_path)

processing data for 2019/1


                                                                                

processing data for 2019/2


                                                                                

processing data for 2019/3


                                                                                

processing data for 2019/4


                                                                                

processing data for 2019/5


                                                                                

processing data for 2019/6


                                                                                

processing data for 2019/7


                                                                                

processing data for 2019/8


                                                                                

processing data for 2019/9


                                                                                

processing data for 2019/10


                                                                                

processing data for 2019/11


                                                                                

processing data for 2019/12


                                                                                

In [26]:
import os

def get_parquet_files(directory):
    parquet_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".parquet"):
                parquet_files.append(os.path.join(root, file))
    return parquet_files

parquet_root_dir = "/home/vladimir/data-engineering-zoomcamp/05-batch/code/data/pq/fhv/2019"

parquet_files = get_parquet_files(parquet_root_dir)

total_size = 0
for file in parquet_files:
    size_mb = os.path.getsize(file) / (1024 * 1024)  # Преобразовать размер в мегабайты
    total_size += size_mb

average_size = total_size / len(parquet_files)
print("Average size of Parquet files (MB):", average_size)


Average size of Parquet files (MB): 12.07019297281901


In [30]:
# question 3

spark.read.parquet("/home/vladimir/data-engineering-zoomcamp/05-batch/code/data/pq/fhv/2019/10") \
    .createOrReplaceTempView("data_10")

spark.sql("""
select count(1)
from data_10
where pickup_datetime >= '2019-10-15 00:00:00'
  and pickup_datetime < '2019-10-16 00:00:00'
""").show()

In [50]:
# question 4 

spark.sql("""
select max(trip_time)
from (
    select dropOff_datetime - pickup_datetime as trip_time
    from data_10
)
""").show(1, False)

+---------------------------------------+
|max(trip_time)                         |
+---------------------------------------+
|INTERVAL '26298 00:30:00' DAY TO SECOND|
+---------------------------------------+



In [70]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

In [72]:
# question 6

spark.read.csv("taxi_zone_lookup.csv", header=True, inferSchema=True).createOrReplaceTempView("zones")
spark.read.parquet("/home/vladimir/data-engineering-zoomcamp/05-batch/code/data/pq/fhv/2019/10") \
    .createOrReplaceTempView("data_10")

spark.sql("""
select zones.Zone, count(1) as cnt
from data_10
 join zones
   on zones.LocationID = data_10.PUlocationID
 group by zones.Zone
 order by cnt
""").show()

+--------------------+---+
|                Zone|cnt|
+--------------------+---+
|         Jamaica Bay|  1|
|Governor's Island...|  2|
| Green-Wood Cemetery|  5|
|       Broad Channel|  8|
|     Highbridge Park| 14|
|        Battery Park| 15|
|Saint Michaels Ce...| 23|
|Breezy Point/Fort...| 25|
|Marine Park/Floyd...| 26|
|        Astoria Park| 29|
|    Inwood Hill Park| 39|
|       Willets Point| 47|
|Forest Park/Highl...| 53|
|  Brooklyn Navy Yard| 57|
|        Crotona Park| 62|
|        Country Club| 77|
|     Freshkills Park| 89|
|       Prospect Park| 98|
|     Columbia Street|105|
|  South Williamsburg|110|
+--------------------+---+
only showing top 20 rows

