In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, count, date_format

# Create SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("ex7-q1") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Set log level
spark.sparkContext.setLogLevel("ERROR")

# Load the RDD
rdd = spark.sparkContext.textFile("file:///home/ahmad/bd-analytics/data/725053-94728-2022")

# Filter and map to extract day and temperature
temperature_rdd = rdd.filter(lambda line: len(line) >= 92) \
    .map(lambda line: (line[15:23], line[87], float(line[88:92]) / 10))   \
    .filter(lambda x: x[2] != 999.9) # ignore missing values usually reported as 9999 in ncdc noaa dataset

#print(temperature_rdd.collect())

# Adjust the temperature based on the sign
adjusted_temperature_rdd = temperature_rdd.map(lambda x: (x[0], x[2] * (-1 if x[1] == '-' else 1)))

# Reduce by key to calculate the sum and count of temperatures for each day
# ex : (day1 , (temp1, 1)), (day1, (temp2, 1)) => (day1, (temp1 + temp2, 1 + 1))
sum_count_rdd = adjusted_temperature_rdd.mapValues(lambda temp: (temp, 1)) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Map again to calculate the average temperature for each day
average_temperature_rdd = sum_count_rdd.mapValues(lambda x: x[0] / x[1])

#print(average_temperature_rdd.collect())

# Convert RDD to DataFrame
temperature_df = average_temperature_rdd.toDF(["Day", "AverageTemperature"])

#convert the day column to date type manually YYYYMMDD to YYYY-MM-DD
temperature_df = temperature_df.withColumn("Day", to_date(col("Day"), "yyyyMMdd"))
# Show the DataFrame
temperature_df.show()



+----------+-------------------+
|       Day| AverageTemperature|
+----------+-------------------+
|2022-01-03|               0.49|
|2022-01-04|-3.7500000000000013|
|2022-01-05|  3.311363636363637|
|2022-01-08|-3.8000000000000007|
|2022-01-09| 1.4249999999999996|
|2022-01-14| 4.1645161290322585|
|2022-01-16| -7.924999999999999|
|2022-01-18| 0.8083333333333335|
|2022-01-22| -6.391666666666666|
|2022-01-25| 2.9458333333333333|
|2022-01-28|-0.6297872340425534|
|2022-01-31| -3.895833333333332|
|2022-02-01|-1.8833333333333322|
|2022-02-02|  3.531111111111111|
|2022-02-04|  4.135714285714287|
|2022-02-07| 0.8871428571428575|
|2022-02-08| 2.8500000000000014|
|2022-02-09| 2.1333333333333333|
|2022-02-11|  8.591666666666667|
|2022-02-13| 0.3303030303030298|
+----------+-------------------+
only showing top 20 rows



In [14]:
# Read the taxi data
taxi_data = spark.read.option("header", "true").option("inferschema", "true").parquet("file:///home/ahmad/bd-analytics/data/yellow_tripdata_2022-01.parquet")


# Extract the date from the pickup datetime 
taxi_data = taxi_data.withColumn("pickup_date", to_date(col("tpep_pickup_datetime")))


# Group by date and count the number of trips for each day
daily_trip_count = taxi_data.groupBy("pickup_date").agg(count("*").alias("trip_count"))

# Show the daily trip count
daily_trip_count.show()




+-----------+----------+
|pickup_date|trip_count|
+-----------+----------+
| 2022-01-31|     85878|
| 2022-01-29|     34388|
| 2009-01-01|         8|
| 2022-03-15|         3|
| 2022-01-09|     64014|
| 2022-01-15|     88704|
| 2022-02-01|         6|
| 2022-01-18|     84603|
| 2022-01-10|     73692|
| 2022-01-08|     83177|
| 2022-01-24|     78541|
| 2022-04-06|         1|
| 2022-01-23|     76914|
| 2022-01-16|     72783|
| 2022-02-22|         1|
| 2022-01-13|     84952|
| 2022-03-09|         1|
| 2022-01-06|     79909|
| 2008-12-31|         6|
| 2022-05-18|         2|
+-----------+----------+
only showing top 20 rows



                                                                                

In [15]:
#join the temperature data with the daily trip count
joined_data = temperature_df.join(daily_trip_count, temperature_df.Day == daily_trip_count.pickup_date)
#remove the pickup_date column
joined_data = joined_data.drop("pickup_date")
joined_data.show()

                                                                                

+----------+-------------------+----------+
|       Day| AverageTemperature|trip_count|
+----------+-------------------+----------+
|2022-01-03|               0.49|     72405|
|2022-01-04|-3.7500000000000013|     74562|
|2022-01-05|  3.311363636363637|     74592|
|2022-01-08|-3.8000000000000007|     83177|
|2022-01-09| 1.4249999999999996|     64014|
|2022-01-14| 4.1645161290322585|     93817|
|2022-01-16| -7.924999999999999|     72783|
|2022-01-18| 0.8083333333333335|     84603|
|2022-01-22| -6.391666666666666|     96587|
|2022-01-25| 2.9458333333333333|     87349|
|2022-01-28|-0.6297872340425534|     95873|
|2022-01-31| -3.895833333333332|     85878|
|2022-02-01|-1.8833333333333322|         6|
|2022-03-15| 12.004166666666663|         3|
|2022-05-18| 17.316666666666666|         2|
|2022-01-01| 11.351063829787234|     63441|
|2022-01-02|   11.6271186440678|     58421|
|2022-01-06|  3.712499999999999|     79909|
|2022-01-07| -1.031818181818182|     71590|
|2022-01-10| 0.5037037037037031|

In [17]:
import altair as alt


scatter_plot = alt.Chart(joined_data.toPandas()).mark_circle().encode(
    x=alt.X('AverageTemperature', title='Average Temperature (°C)', scale=alt.Scale(zero=False)),
    y=alt.Y('trip_count', title='Trip Count'),
    tooltip=['Day', 'AverageTemperature', 'trip_count']
).interactive().properties(
    width=800,  # Set the width of the chart
    height=600  # Set the height of the chart
)

# Save the plot to a file
scatter_plot.save('scatter_plot.html')


                                                                                

In [None]:
scatter_plot.show()