In [1]:
# Load pyspark, pandas
from pyspark import SparkConf, SparkContext
import pandas as pd

In [2]:
# Configure Spark
conf = SparkConf().setMaster("local").setAppName("uber-date-trips")
sc = SparkContext(conf=conf)

In [3]:
# Dataset sample
filename = "fhvhv_tripdata_2020-03_short.csv"

In [5]:
# Data parsing
lines = sc.textFile("./data/" + filename)   # .csv -> RDD object
header = lines.first()
filtered_lines = lines.filter(lambda row:row != header) # all lines excepting the header

./data/fhvhv_tripdata_2020-03_short.csv


In [35]:
"""
    Run the Spark job
    - map(): apply the transformation on every element of RDD -> new RDD
    - countByValue(): action that returns the count of each unique value
    - x.split(", ")[2]: extract the value of pickup_datetime from a row
    e.g., 2020-03-01 00:03:40
    - .split(" ")[0]: extract the date from the pickup_datetime
    e.g., 2020-03-01
"""

dates = filtered_lines.map(lambda x: x.split(",")[0])
result = dates.countByValue()

In [18]:
print(result)

defaultdict(<class 'int'>, {'HV0005': 1176143, 'HV0003': 3537637, 'HV0004': 147216})


In [25]:
# Save results as a csv file
pd.Series(result, name="trips").to_csv("./data/trips_date.csv")

In [44]:
df = pd.DataFrame(list(result.items()), columns=['Key','Value'])

# Key 열의 0과 (0,1)인 행을 선택하여 제거하는 연산을 함
key_idx = df['Key'].isin( [ 0, (0,1) ] )
new_df = df.drop(df[key_idx].index)

print(new_df)

      Key    Value
0  HV0005  1176143
1  HV0003  3537637
2  HV0004   147216
