In [0]:
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/qureshiferoz988@gmail.com/Divvy_Trips_2020_Q1.csv")
df2 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/qureshiferoz988@gmail.com/Divvy_Trips_2019_Q4.csv")

In [0]:
from pyspark.sql.functions import count, avg, last, to_date, month, desc, dense_rank, col, rank
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pyspark.sql import SparkSession

def calculate_average_duration_per_day(df2):
    df2 = df2.withColumn("start_date", to_date("start_time"))
    average_duration_per_day = df2.groupby("start_date").agg(avg("tripduration").alias("average_duration"))
    return average_duration_per_day

def calculate_number_of_trips_per_day(df2):
    df2 = df2.withColumn("start_date", to_date("start_time"))
    number_of_trips_per_day = df2.groupby("start_date").agg(count("tripduration").alias("number_of_trips_per_day"))
    return number_of_trips_per_day

def calculate_most_popular_start_station(df1):
    df1 = df1.withColumn("month", month("started_at"))
    grouped_df = df1.groupBy("month", "start_station_name").agg(count("*").alias("trip_count"))
    ranked_df = grouped_df.withColumn("rank", dense_rank().over(Window.partitionBy("month").orderBy(desc("trip_count"))))
    most_popular_start_station = ranked_df.filter(col("rank") == 1).select("month", "start_station_name", "trip_count")
    return most_popular_start_station

def calculate_top_stations(df, num_stations=3, num_days=14):
    df = df.withColumn("start_date", to_date("start_time"))
    grouped_data = df.groupBy("start_date", "from_station_name").count()
    # Rank the stations within each day based on trip count
    window_spec = Window.partitionBy("start_date").orderBy(col("count").desc())
    ranked_data = grouped_data.withColumn("rank", rank().over(window_spec))
    # Filter the top 3 trip stations for each day
    top_stations = ranked_data.filter(col("rank") <= 3)
    return top_stations

def calculate_average_trip_duration_by_gender(df2):
    df2 = df2.withColumn("gender", last("gender", ignorenulls=True).over(Window.orderBy("start_time")))
    average_trip_duration = df2.groupBy("gender").agg(avg("tripduration").alias("avg_trip_duration"))
    return average_trip_duration

def calculate_top_10_longest_and_shortest_trips(df2):
    df2 = df2.withColumn("birthyear", df2["birthyear"].cast(IntegerType()))
    df = df2.withColumn("tripduration_minutes", df2["tripduration"] / 60)
    avg_duration_by_age = df.groupBy("birthyear").agg(avg("tripduration_minutes").alias("avg_trip_duration"))
    top_10_longest_trips = avg_duration_by_age.orderBy(col("avg_trip_duration").desc()).limit(10)
    top_10_shortest_trips = avg_duration_by_age.orderBy(col("avg_trip_duration")).limit(10)
    return top_10_longest_trips, top_10_shortest_trips

# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Load the DataFrames df1 and df2

# Perform analysis tasks
average_duration_per_day = calculate_average_duration_per_day(df2)
number_of_trips_per_day = calculate_number_of_trips_per_day(df2)
most_popular_start_station = calculate_most_popular_start_station(df1)
top_stations = calculate_top_stations(df2, num_stations=3, num_days=14)
average_trip_duration = calculate_average_trip_duration_by_gender(df2)
top_10_longest_trips, top_10_shortest_trips = calculate_top_10_longest_and_shortest_trips(df2)

# Save the results as CSV files
average_duration_per_day.write.csv("dbfs:/FileStore/reports/average_duration_per_day.csv", header=True, mode="overwrite")
number_of_trips_per_day.write.csv("dbfs:/FileStore/reports/number_of_trips_per_day.csv", header=True, mode="overwrite")
most_popular_start_station.write.csv("dbfs:/FileStore/reports/most_popular_start_station.csv", header=True, mode="overwrite")
top_stations.write.csv("dbfs:/FileStore/reports/top_stations.csv", header=True, mode="overwrite")
average_trip_duration.write.csv("dbfs:/FileStore/reports/average_trip_duration.csv", header=True, mode="overwrite")
top_10_longest_trips.write.csv("dbfs:/FileStore/reports/top_10_longest_trips.csv", header=True, mode="overwrite")
top_10_shortest_trips.write.csv("dbfs:/FileStore/reports/top_10_shortest_trips.csv", header=True, mode="overwrite")
