# EDA Spark SQL
by Estera Kot

In [None]:
import time
from pyspark.sql.functions import col, avg, max, when
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col, avg, when
import time
import random
import string

def generate_table_name(length=10):
    letters = string.ascii_lowercase
    table_name = ''.join(random.choice(letters) for i in range(length))
    return table_name

def clean_tables(table_name):
    for table in table_name:
        q = '''
        drop table if exists '''+ table
        spark.sql(q)

def adjust_columns_names(df):
    for col_name in df.columns:
        # Create a new column name by removing underscores and parentheses
        new_col_name = col_name.replace("_", "").replace("(", "").replace(")", "")
        
        # Rename the column
        df = df.withColumnRenamed(col_name, new_col_name)
        return df


def queries(pause: bool):
    tables = []

    % TODO parametrize your delta table e.g., name_of_your_lakehouse.delta_table_name
    df = spark.sql("SELECT * FROM Bronze.yellowtripdata201501")
    df.count()

    print("Step 1")    
    segments = [0, 5, 10, 15, 20, 25]
    query = """
    SELECT *,
    CASE
        WHEN trip_distance IS NULL THEN NULL
        WHEN trip_distance <= {} THEN "<= {}"
        WHEN trip_distance <= {} THEN "{} - {}"
        WHEN trip_distance <= {} THEN "{} - {}"
        WHEN trip_distance <= {} THEN "{} - {}"
        WHEN trip_distance <= {} THEN "{} - {}"
        ELSE ">= {}"
    END AS distance_segment
    FROM Bronze.true_big_table
    """.format(segments[0], segments[0], 
            segments[1], str(segments[0]+0.01), str(segments[1]), 
            segments[2], str(segments[1]+0.01), str(segments[2]),
            segments[3], str(segments[2]+0.01), str(segments[3]),
            segments[4], str(segments[3]+0.01), str(segments[4]),
            str(segments[4]+0.01))

    df_with_segment = spark.sql(query)
    table_name = generate_table_name()
    df_with_segment.write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)
       
    print("Step 2")
    # Calculate average fare_amount and tip_amount per segment
    segments = [2, 5, 10, 20, 50]  # Adjust these values as per your requirements

    query_segment = """
    SELECT *,
    CASE
        WHEN trip_distance IS NULL THEN NULL
        WHEN trip_distance <= {} THEN "<= {}"
        WHEN trip_distance <= {} THEN "{} - {}"
        WHEN trip_distance <= {} THEN "{} - {}"
        WHEN trip_distance <= {} THEN "{} - {}"
        WHEN trip_distance <= {} THEN "{} - {}"
        ELSE ">= {}"
    END AS distance_segment
    FROM Bronze.true_big_table
    """.format(segments[0], segments[0], 
            segments[1], str(segments[0]+0.01), str(segments[1]), 
            segments[2], str(segments[1]+0.01), str(segments[2]),
            segments[3], str(segments[2]+0.01), str(segments[3]),
            segments[4], str(segments[3]+0.01), str(segments[4]),
            str(segments[4]+0.01))

    df_with_segment = spark.sql(query_segment)
    df_with_segment.count()    
    table_name = generate_table_name()
    df_with_segment.write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)
    

    # df_with_segment.write.format("delta").mode("append").saveAsTable("Bronze.segmented_table")

    query_avg = """
    SELECT distance_segment, 
        AVG(fare_amount) AS avg_fare_amount, 
        AVG(tip_amount) AS avg_tip_amount
    FROM Bronze.segmented_table
    GROUP BY distance_segment
    """

    avg_amounts_by_segment = spark.sql(query_avg)

    # Display the result
    avg_amounts_by_segment.count()
    table_name = generate_table_name()
    avg_amounts_by_segment.write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)



    # Define the segments for trip_distance
    segments = [0, 5, 10, 15, 20, 25]
    print("Step 3")
    # Calculate average fare_amount, tip_amount, and maximum values per segment
    amounts_by_segment = spark.sql("""
    SELECT 
        distance_segment, 
        AVG(fare_amount) AS avg_fare_amount, 
        AVG(tip_amount) AS avg_tip_amount,
        MAX(fare_amount) AS max_fare_amount,
        MAX(tip_amount) AS max_tip_amount
    FROM segmented_table
    GROUP BY distance_segment
    """)

    amounts_by_segment.count()
    table_name = generate_table_name()
    amounts_by_segment.write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    

    print("Step 4")
    q = spark.sql("SELECT COUNT(*) FROM true_big_table")
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)


    print("Step 5")
    q = spark.sql("SELECT AVG(trip_distance) FROM true_big_table")
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 6")
    q = spark.sql("SELECT MAX(fare_amount) FROM true_big_table")
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 7")
    q = spark.sql("SELECT VendorID, COUNT(*) AS trip_count FROM true_big_table GROUP BY VendorID")
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 8")
    q = spark.sql("SELECT VendorID, AVG(fare_amount) AS avg_fare FROM true_big_table GROUP BY VendorID")
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 9")
    q = spark.sql("SELECT VendorID, SUM(tip_amount) AS total_tips FROM true_big_table GROUP BY VendorID")
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 10")
    q = spark.sql("SELECT AVG(passenger_count) FROM true_big_table")
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 11")
    q = spark.sql("SELECT AVG(total_amount) FROM true_big_table")
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 12")
    q = spark.sql("SELECT payment_type, COUNT(*) AS trip_count FROM true_big_table GROUP BY payment_type")
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 12")
    q = spark.sql("SELECT RateCodeID, AVG(fare_amount) AS avg_fare, AVG(tip_amount) AS avg_tip FROM true_big_table GROUP BY RateCodeID")
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 13")
    q = spark.sql("SELECT passenger_count, COUNT(*) AS trip_count FROM true_big_table GROUP BY passenger_count")
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 14")
    q = spark.sql("""
    SELECT HOUR(tpep_pickup_datetime) AS pickup_hour, COUNT(*) AS trip_count
    FROM true_big_table
    GROUP BY pickup_hour
    ORDER BY pickup_hour
    """)
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)


    print("Step 15")
    q = spark.sql("""
    SELECT VendorID, AVG(fare_amount) AS avg_fare 
    FROM true_big_table 
    GROUP BY VendorID 
    ORDER BY avg_fare DESC
    """)
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)


    print("Step 16")
    q = spark.sql("""
    SELECT CORR(trip_distance, fare_amount) AS correlation
    FROM true_big_table
    """)
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)


    print("Step 17")
    q = spark.sql("""
    SELECT payment_type, AVG(passenger_count) AS avg_passenger_count
    FROM true_big_table
    GROUP BY payment_type
    """)
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)


    print("Step 18")
    q = spark.sql("""
    SELECT PERCENTILE(trip_distance, 0.5) AS median_trip_distance
    FROM true_big_table
    """)
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 19")
    q = spark.sql("""
    SELECT 
        sub.VendorID, 
        sub.avg_fare
    FROM 
        (SELECT 
            VendorID, 
            AVG(fare_amount) AS avg_fare 
        FROM true_big_table 
        GROUP BY VendorID) AS sub
    WHERE 
        sub.avg_fare > (SELECT AVG(fare_amount) FROM true_big_table)
    ORDER BY 
        sub.avg_fare DESC 
    """)
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)


    print("Step 20")
    q = spark.sql("""
    SELECT 
        VendorID, 
        RateCodeID, 
        AVG(tip_amount) AS avg_tip 
    FROM true_big_table 
    GROUP BY 
        VendorID, RateCodeID 
    ORDER BY 
        avg_tip DESC 
    """)
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)
    
    print("Step 21")
    q = spark.sql("""
    SELECT 
        tpep_pickup_datetime, 
        fare_amount, 
        AVG(fare_amount) OVER (ORDER BY DATE(tpep_pickup_datetime) ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS moving_average
    FROM true_big_table 
    ORDER BY tpep_pickup_datetime
    LIMIT 1
    """)
    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 22")
    q = spark.sql("""
    SELECT 
        VendorID, 
        DATE(tpep_pickup_datetime) AS date, 
        SUM(total_amount) AS total_daily_amount 
    FROM true_big_table 
    GROUP BY 
        VendorID, 
        date
    ORDER BY 
        VendorID, 
        date
    """)

    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    print("Step 23")
    q = spark.sql("""
    SELECT 
        VendorID,
        SUM(total_amount) AS total_amount
    FROM 
        (SELECT 
            *, 
            PERCENT_RANK() OVER(ORDER BY fare_amount DESC) AS rank
        FROM true_big_table) AS sub 
    WHERE 
        rank <= 0.10
    GROUP BY 
        VendorID
    ORDER BY 
        total_amount DESC
    LIMIT 1
    """)

    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)


    print("Step 24")
    q = spark.sql("""
    SELECT 
        passenger_count, 
        AVG(trip_distance) AS avg_trip_distance 
    FROM true_big_table 
    WHERE fare_amount > (SELECT AVG(fare_amount) FROM true_big_table) 
    GROUP BY 
        passenger_count
    ORDER BY 
        passenger_count
    """)

    table_name = generate_table_name()
    adjust_columns_names(q).write.format("delta").mode("overwrite").saveAsTable(table_name)
    tables.append(table_name)

    time.sleep(200) if pause else time.sleep(1)

    clean_tables(tables)



In [None]:
queries(pause=False)
clean_tables()