In [76]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import types

Init Spark Session

In [77]:
spark = SparkSession.builder\
    .master("local[*]")\
    .appName('test')\
    .getOrCreate()

In [78]:
!mkdir data
!wget -P data/ https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

mkdir: data: File exists
--2023-03-07 16:10:21--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz
Translacja github.com (github.com)... 140.82.121.4
Łączenie się z github.com (github.com)|140.82.121.4|:443... połączono.
Żądanie HTTP wysłano, oczekiwanie na odpowiedź... 302 Found
Lokalizacja: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/035746e8-4e24-47e8-a3ce-edcf6d1b11c7?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230307%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230307T151021Z&X-Amz-Expires=300&X-Amz-Signature=966a3c9844036f5c6c38598ea7851c1cf6393316f3a94ee04048bb5dbde1784f&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-01.csv.gz&response-content-type=application%2Foctet-stream [podążanie]
--2023-03-07 16:10:21--  https://objects.githubusercontent.com/github-production

Check number of rows

In [79]:
!wc -l data/fhvhv_tripdata_2021-01.csv.gz

  508066 data/fhvhv_tripdata_2021-01.csv.gz


In [80]:
df = spark.read\
    .option("header", "true")\
    .csv('data/fhvhv_tripdata_2021-01.csv.gz', encoding='utf-8')

In [81]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:33:44', dropoff_datetime='2021-01-01 00:49:07', PULocationID='230', DOLocationID='166', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:55:19', dropoff_datetime='2021-01-01 01:18:21', PULocationID='152', DOLocationID='167', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:23:56', dropoff_datetime='2021-01-01 00:38:05', PULocationID='233', DOLocationID='142', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:42:51', dropoff_datetime='2021-01-01 00:45:50', PULocationID='142', DOLocationID='143', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:48:14', dropoff_datetime='2021-01-01 01:08:42', PULocationID='143', DOLocationID='78', SR_Flag=None)]

All fields are in StringType

In [82]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True)])

In [49]:
#!head -n 1001 data/fhvhv_tripdata_2021-01.csv.gz > data/head.csv.gz

Create pandas DataFrame

In [83]:
df_pandas = pd.read_csv('data/fhvhv_tripdata_2021-01.csv.gz', nrows=1001)
df_pandas

Unnamed: 0,hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
0,HV0003,B02682,2021-01-01 00:33:44,2021-01-01 00:49:07,230,166,
1,HV0003,B02682,2021-01-01 00:55:19,2021-01-01 01:18:21,152,167,
2,HV0003,B02764,2021-01-01 00:23:56,2021-01-01 00:38:05,233,142,
3,HV0003,B02764,2021-01-01 00:42:51,2021-01-01 00:45:50,142,143,
4,HV0003,B02764,2021-01-01 00:48:14,2021-01-01 01:08:42,143,78,
...,...,...,...,...,...,...,...
996,HV0005,B02510,2021-01-01 00:53:10,2021-01-01 01:21:09,114,21,
997,HV0003,B02512,2021-01-01 00:18:35,2021-01-01 00:26:53,167,42,
998,HV0003,B02512,2021-01-01 00:35:39,2021-01-01 00:42:59,116,116,
999,HV0003,B02512,2021-01-01 00:44:54,2021-01-01 01:15:59,116,132,


In [84]:
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

Convert pandas DataFrame into Spark DataFrame

In [85]:
spark.createDataFrame(df_pandas).schema

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True)])

Modify schema

In [86]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True), 
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.LongType(), True), 
    types.StructField('DOLocationID', types.LongType(), True), 
    types.StructField('SR_Flag', types.StringType(), True)
])

Read .csv file with schema 

In [87]:
df = spark.read\
    .option("header", "true")\
    .schema(schema)\
    .csv('data/fhvhv_tripdata_2021-01.csv.gz', encoding='utf-8')

In [88]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DOLocationID=167, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 38, 5), PULocationID=233, DOLocationID=142, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 42, 51), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 45, 50), PULocationID=142, DOLocationID=143, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_dat

Create 24 partitions of DataFrame

In [89]:
df = df.repartition(24)

In [90]:
df.write.parquet('data/fhvhv/2021/01/')

[Stage 21:>                                                        (0 + 8) / 24]

23/03/07 16:28:16 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers




23/03/07 16:28:24 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers




23/03/07 16:28:29 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers


                                                                                