In [0]:
from pyspark.sql import SparkSession
import pandas as pd

In [0]:
from pyspark.sql import SparkSession

# Create or get SparkSession (assuming you already have it created)
spark = SparkSession.builder.getOrCreate()

# Replace with the actual name of your table
table_name = "default.uber_data_4_csv"  # Update this with your table name

# Load the DataFrame from the table
df = spark.table(table_name)

# Convert Spark DataFrame to pandas DataFrame (assuming Arrow is enabled)
pandas_df = df.toPandas()

# ... (rest of your code using pandas_df)

In [0]:
from pyspark.sql.functions import col, to_timestamp
df = df.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime")))
df = df.withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))



In [0]:
df = df.drop_duplicates()
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("trip_id", monotonically_increasing_id())

In [0]:
from pyspark.sql.functions import col, year, month, dayofmonth, hour, dayofweek, monotonically_increasing_id

# Create datetime_dim DataFrame
datetime_dim = df.select(col("tpep_pickup_datetime"), col("tpep_dropoff_datetime"))

# Drop duplicates
datetime_dim = datetime_dim.dropDuplicates()

# Extract datetime components for pickup
datetime_dim = datetime_dim.withColumn("pick_hour", hour(col("tpep_pickup_datetime")))
datetime_dim = datetime_dim.withColumn("pick_day", dayofmonth(col("tpep_pickup_datetime")))
datetime_dim = datetime_dim.withColumn("pick_month", month(col("tpep_pickup_datetime")))
datetime_dim = datetime_dim.withColumn("pick_year", year(col("tpep_pickup_datetime")))
datetime_dim = datetime_dim.withColumn("pick_weekday", dayofweek(col("tpep_pickup_datetime")))


# Extract datetime components for dropoff (similar operations)
datetime_dim = datetime_dim.withColumn("drop_hour", hour(col("tpep_dropoff_datetime")))
datetime_dim = datetime_dim.withColumn("drop_day", dayofmonth(col("tpep_dropoff_datetime")))
datetime_dim = datetime_dim.withColumn("drop_month", month(col("tpep_dropoff_datetime")))
datetime_dim = datetime_dim.withColumn("drop_year", year(col("tpep_dropoff_datetime")))
datetime_dim = datetime_dim.withColumn("drop_weekday", dayofweek(col("tpep_dropoff_datetime")))

# Create datetime_id using index
datetime_dim = datetime_dim.withColumn("datetime_id", monotonically_increasing_id())

# Reorder columns (optional)
datetime_dim = datetime_dim.select("datetime_id", "tpep_pickup_datetime", "pick_hour", "pick_day", "pick_month", "pick_year", "pick_weekday", "tpep_dropoff_datetime", "drop_hour", "drop_day", "drop_month", "drop_year", "drop_weekday")

# Display the first few rows
datetime_dim.show()


+-----------+--------------------+---------+--------+----------+---------+------------+---------------------+---------+--------+----------+---------+------------+
|datetime_id|tpep_pickup_datetime|pick_hour|pick_day|pick_month|pick_year|pick_weekday|tpep_dropoff_datetime|drop_hour|drop_day|drop_month|drop_year|drop_weekday|
+-----------+--------------------+---------+--------+----------+---------+------------+---------------------+---------+--------+----------+---------+------------+
|          0| 2016-03-10 07:07:32|        7|      10|         3|     2016|           5|  2016-03-10 07:23:35|        7|      10|         3|     2016|           5|
|          1| 2016-03-10 07:07:56|        7|      10|         3|     2016|           5|  2016-03-10 07:22:02|        7|      10|         3|     2016|           5|
|          2| 2016-03-10 07:09:03|        7|      10|         3|     2016|           5|  2016-03-10 07:15:34|        7|      10|         3|     2016|           5|
|          3| 2016-03-

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Create passenger_count_dim
passenger_count_dim = df.select("passenger_count")  # Select only the passenger_count column
passenger_count_dim = passenger_count_dim.withColumn("passenger_count_id", monotonically_increasing_id())  # Add ID using index
passenger_count_dim = passenger_count_dim.select("passenger_count_id", "passenger_count")  # Reorder columns (optional)

# Create trip_distance_dim (similar approach)
trip_distance_dim = df.select("trip_distance")
trip_distance_dim = trip_distance_dim.withColumn("trip_distance_id", monotonically_increasing_id())
trip_distance_dim = trip_distance_dim.select("trip_distance_id", "trip_distance")

In [0]:
passenger_count_dim.show()

+------------------+---------------+
|passenger_count_id|passenger_count|
+------------------+---------------+
|                 0|              1|
|                 1|              1|
|                 2|              1|
|                 3|              1|
|                 4|              1|
|                 5|              1|
|                 6|              2|
|                 7|              3|
|                 8|              5|
|                 9|              2|
|                10|              1|
|                11|              6|
|                12|              6|
|                13|              1|
|                14|              1|
|                15|              4|
|                16|              6|
|                17|              2|
|                18|              1|
|                19|              1|
+------------------+---------------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import col, lit, when

# Define the rate code mapping as a dictionary
rate_code_map = {
  1: "Standard rate",
  2: "JFK",
  3: "Newark",
  4: "Nassau or Westchester",
  5: "Negotiated fare",
  6: "Group ride"
}

# Broadcast the rate code mapping dictionary for efficiency
rate_code_map_broadcast = spark.sparkContext.broadcast(rate_code_map)

# Create rate_code_dim
rate_code_dim = df.select("RatecodeID")  # Select only the RatecodeID column
rate_code_dim = rate_code_dim.withColumn("rate_code_id", monotonically_increasing_id())  # Add ID using index
rate_code_dim = rate_code_dim.withColumn("rate_code_name", 
                                         when(col("RatecodeID").isin(*rate_code_map.keys()), 
                                              lit(None)).otherwise(lit("Unknown")))

# Map rate code to name using UDF (User-Defined Function)
def map_rate_code(rate_code):
    return rate_code_map_broadcast.value.get(rate_code, "Unknown")

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

map_rate_code_udf = udf(map_rate_code, StringType())

rate_code_dim = rate_code_dim.withColumn("rate_code_name", map_rate_code_udf(col("RatecodeID")))

# Reorder columns (optional)
rate_code_dim = rate_code_dim.select("rate_code_id", "RatecodeID", "rate_code_name")

# Display rate_code_dim
rate_code_dim.show()

+------------+----------+--------------+
|rate_code_id|RatecodeID|rate_code_name|
+------------+----------+--------------+
|           0|         1| Standard rate|
|           1|         1| Standard rate|
|           2|         1| Standard rate|
|           3|         1| Standard rate|
|           4|         1| Standard rate|
|           5|         1| Standard rate|
|           6|         1| Standard rate|
|           7|         1| Standard rate|
|           8|         1| Standard rate|
|           9|         1| Standard rate|
|          10|         1| Standard rate|
|          11|         1| Standard rate|
|          12|         1| Standard rate|
|          13|         1| Standard rate|
|          14|         1| Standard rate|
|          15|         1| Standard rate|
|          16|         1| Standard rate|
|          17|         1| Standard rate|
|          18|         1| Standard rate|
|          19|         1| Standard rate|
+------------+----------+--------------+
only showing top

In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id, lit, when
from pyspark.sql import SparkSession

payment_type_name = {
    1: "Credit card",
    2: "Cash",
    3: "No charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided trip"
}

# Broadcast the payment type mapping dictionary for efficiency
payment_type_name_broadcast = spark.sparkContext.broadcast(payment_type_name)

# Create payment_type_dim
payment_type_dim = df.select("payment_type")  # Select only the payment_type column
payment_type_dim = payment_type_dim.withColumn("payment_type_id", monotonically_increasing_id())  # Add ID using index

# Map payment type to name using UDF (User-Defined Function)
def map_payment_type(payment_type):
    return payment_type_name_broadcast.value.get(payment_type, "Unknown")

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

map_payment_type_udf = udf(map_payment_type, StringType())

payment_type_dim = payment_type_dim.withColumn("payment_type_name", map_payment_type_udf(col("payment_type")))

# Reorder columns (optional)
payment_type_dim = payment_type_dim.select("payment_type_id", "payment_type", "payment_type_name")

# Display the resulting DataFrame
payment_type_dim.show()

+---------------+------------+-----------------+
|payment_type_id|payment_type|payment_type_name|
+---------------+------------+-----------------+
|              0|           1|          Unknown|
|              1|           1|          Unknown|
|              2|           1|          Unknown|
|              3|           2|          Unknown|
|              4|           1|          Unknown|
|              5|           2|          Unknown|
|              6|           2|          Unknown|
|              7|           1|          Unknown|
|              8|           1|          Unknown|
|              9|           1|          Unknown|
|             10|           2|          Unknown|
|             11|           1|          Unknown|
|             12|           2|          Unknown|
|             13|           1|          Unknown|
|             14|           1|          Unknown|
|             15|           1|          Unknown|
|             16|           1|          Unknown|
|             17|   

In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id

# Create pickup_location_dim
pickup_location_dim = df.select("pickup_longitude", "pickup_latitude")  # Select only the desired columns
pickup_location_dim = pickup_location_dim.withColumn("pickup_location_id", monotonically_increasing_id())  # Add ID using index
pickup_location_dim = pickup_location_dim.select("pickup_location_id", "pickup_latitude", "pickup_longitude")  # Reorder columns (optional)

# Display pickup_location_dim
pickup_location_dim.show()

+------------------+---------------+----------------+
|pickup_location_id|pickup_latitude|pickup_longitude|
+------------------+---------------+----------------+
|                 0|       40.60926|      -74.651306|
|                 1|       40.74684|       -73.98588|
|                 2|      40.717075|       -73.99157|
|                 3|       40.75896|        -73.9843|
|                 4|       40.73461|       -73.99866|
|                 5|      40.755627|        -73.9906|
|                 6|       40.75195|      -73.975075|
|                 7|      40.771076|       -73.86641|
|                 8|       40.76581|       -73.92645|
|                 9|      40.760414|       -74.00295|
|                10|       40.76104|       -73.98288|
|                11|      40.767204|       -73.96251|
|                12|      40.780525|      -73.949036|
|                13|       40.77373|        -73.8708|
|                14|       40.76596|       -73.96334|
|                15|       4

In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id

# Create dropoff_location_dim
dropoff_location_dim = df.select("dropoff_longitude", "dropoff_latitude")  # Select only the desired columns
dropoff_location_dim = dropoff_location_dim.withColumn("dropoff_location_id", monotonically_increasing_id())  # Add ID using index
dropoff_location_dim = dropoff_location_dim.select("dropoff_location_id", "dropoff_latitude", "dropoff_longitude")  # Reorder columns (optional)

# Display dropoff_location_dim
dropoff_location_dim.show()


+-------------------+----------------+-----------------+
|dropoff_location_id|dropoff_latitude|dropoff_longitude|
+-------------------+----------------+-----------------+
|                  0|        40.60926|       -74.651306|
|                  1|        40.70756|        -74.00725|
|                  2|        40.72248|        -73.98078|
|                  3|        40.76184|        -73.99066|
|                  4|        40.76428|        -73.97527|
|                  5|       40.763397|        -73.97066|
|                  6|        40.74193|        -73.98673|
|                  7|        40.63352|        -74.01157|
|                  8|        40.73465|        -73.98324|
|                  9|        40.75079|        -73.98292|
|                 10|        40.75462|        -73.97627|
|                 11|       40.789665|        -73.96607|
|                 12|        40.79473|       -73.943756|
|                 13|        40.75993|        -73.97884|
|                 14|        40

In [0]:
from pyspark.sql import SparkSession

passenger_count_dim = df.select("trip_id").withColumnRenamed("trip_id", "passenger_count_id")
trip_distance_dim = df.select("trip_id").withColumnRenamed("trip_id", "trip_distance_id")
rate_code_dim = df.select("trip_id").withColumnRenamed("trip_id", "rate_code_id")
pickup_location_dim = df.select("trip_id").withColumnRenamed("trip_id", "pickup_location_id")
dropoff_location_dim = df.select("trip_id").withColumnRenamed("trip_id", "dropoff_location_id")
datetime_dim = df.select("trip_id").withColumnRenamed("trip_id", "datetime_id")
payment_type_dim = df.select("trip_id").withColumnRenamed("trip_id", "payment_type_id")

# Perform the joins to create the fact table
fact_table = df \
    .join(passenger_count_dim, df.trip_id == passenger_count_dim.passenger_count_id) \
    .join(trip_distance_dim, df.trip_id == trip_distance_dim.trip_distance_id) \
    .join(rate_code_dim, df.trip_id == rate_code_dim.rate_code_id) \
    .join(pickup_location_dim, df.trip_id == pickup_location_dim.pickup_location_id) \
    .join(dropoff_location_dim, df.trip_id == dropoff_location_dim.dropoff_location_id) \
    .join(datetime_dim, df.trip_id == datetime_dim.datetime_id) \
    .join(payment_type_dim, df.trip_id == payment_type_dim.payment_type_id) \
    .select(
        df.trip_id, df.VendorID, datetime_dim.datetime_id, passenger_count_dim.passenger_count_id,
        trip_distance_dim.trip_distance_id, rate_code_dim.rate_code_id, df.store_and_fwd_flag,
        pickup_location_dim.pickup_location_id, dropoff_location_dim.dropoff_location_id,
        payment_type_dim.payment_type_id, df.fare_amount, df.extra, df.mta_tax, df.tip_amount,
        df.tolls_amount, df.improvement_surcharge, df.total_amount
    )

# Display the resulting fact table
fact_table.show()

+-------+--------+-----------+------------------+----------------+------------+------------------+------------------+-------------------+---------------+-----------+-----+-------+----------+------------+---------------------+------------+
|trip_id|VendorID|datetime_id|passenger_count_id|trip_distance_id|rate_code_id|store_and_fwd_flag|pickup_location_id|dropoff_location_id|payment_type_id|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+-------+--------+-----------+------------------+----------------+------------+------------------+------------------+-------------------+---------------+-----------+-----+-------+----------+------------+---------------------+------------+
|      0|       2|          0|                 0|               0|           0|                 N|                 0|                  0|              0|        6.5|  0.5|    0.5|       1.0|         0.0|                  0.3|         8.8|
|      1|       2|          1|              

In [0]:
%sql
-- Create the fact_table
CREATE TABLE fact_table (
  trip_id INT,
  VendorID INT,
  datetime_id INT,
  passenger_count_id INT,
  trip_distance_id INT,
  rate_code_id INT,
  store_and_fwd_flag STRING,
  pickup_location_id INT,
  dropoff_location_id INT,
  payment_type_id INT,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  improvement_surcharge DOUBLE,
  total_amount DOUBLE
);

In [0]:
%sql
select * from default.dropoff_location_dim

dropoff_location_id,dropoff_latitude,dropoff_longitude


In [0]:
%sql
-- Assuming your tables (dimension and fact) already exist

-- 1. Create temporary views for dimension tables:
CREATE OR REPLACE TEMP VIEW passenger_count_dim AS
SELECT trip_id AS passenger_count_id, passenger_count
FROM passenger_count_dim;

CREATE OR REPLACE TEMP VIEW trip_distance_dim AS
SELECT trip_id AS trip_distance_id, trip_distance
FROM trip_distance_dim;

CREATE OR REPLACE TEMP VIEW rate_code_dim AS
SELECT trip_id AS rate_code_id, RatecodeID, rate_code_name
FROM rate_code_dim;

CREATE OR REPLACE TEMP VIEW payment_type_dim AS
SELECT trip_id AS payment_type_id, payment_type, payment_type_name
FROM payment_type_dim;

CREATE OR REPLACE TEMP VIEW pickup_location_dim AS
SELECT trip_id AS pickup_location_id, pickup_latitude, pickup_longitude
FROM pickup_location_dim;

CREATE OR REPLACE TEMP VIEW dropoff_location_dim AS
SELECT trip_id AS dropoff_location_id, dropoff_latitude, dropoff_longitude
FROM dropoff_location_dim;

CREATE OR REPLACE TEMP VIEW datetime_dim AS
SELECT trip_id AS datetime_id,
       tpep_pickup_datetime, pick_hour, pick_day, pick_month, pick_year, pick_weekday,
       tpep_dropoff_datetime, drop_hour, drop_day, drop_month, drop_year, drop_weekday
FROM datetime_dim;

-- 2. Join the tables:
SELECT f.trip_id,
       f.VendorID,
       dt.datetime_id,
       pcd.passenger_count_id,
       tdd.trip_distance_id,
       rcd.rate_code_id,
       f.store_and_fwd_flag,
       pld.pickup_location_id,
       dld.dropoff_location_id,
       ptd.payment_type_id,
       f.fare_amount,
       f.extra,
       f.mta_tax,
       f.tip_amount,
       f.tolls_amount,
       f.improvement_surcharge,
       f.total_amount
FROM fact_table f
INNER JOIN datetime_dim dt ON f.trip_id = dt.datetime_id
INNER JOIN passenger_count_dim pcd ON f.trip_id = pcd.passenger_count_id
INNER JOIN trip_distance_dim tdd ON f.trip_id = tdd.trip_distance_id
INNER JOIN rate_code_dim rcd ON f.trip_id = rcd.rate_code_id
INNER JOIN payment_type_dim ptd ON f.trip_id = ptd.payment_type_id
INNER JOIN pickup_location_dim pld ON f.trip_id = pld.pickup_location_id
INNER JOIN dropoff_location_dim dld ON f.trip_id = dld.dropoff_location_id;

-- Drop temporary views (optional)
DROP TEMPORARY VIEW IF EXISTS passenger_count_dim;
DROP TEMPORARY VIEW IF EXISTS trip_distance_dim;
DROP TEMPORARY VIEW IF EXISTS rate_code_dim;
DROP TEMPORARY VIEW IF EXISTS payment_type_dim;
DROP TEMPORARY VIEW IF EXISTS pickup_location_dim;
DROP TEMPORARY VIEW IF EXISTS dropoff_location_dim;
DROP TEMPORARY VIEW IF EXISTS datetime_dim;


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-546318218290649>:21[0m
[1;32m     19[0m     display(df)
[1;32m     20[0m     [38;5;28;01mreturn[39;00m df
[0;32m---> 21[0m   _sqldf [38;5;241m=[39m [43m____databricks_percent_sql[49m[43m([49m[43m)[49m
[1;32m     22[0m [38;5;28;01mfinally[39;00m:
[1;32m     23[0m   [38;5;28;01mdel[39;00m ____databricks_percent_sql

File [0;32m<command-546318218290649>:4[0m, in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m [38;5;28;01mdef[39;00m [38;5;21m____databricks_percent_sql[39m():
[1;32m      3[0m   [38;5;28;01mimport[39;00m [38;5;21;01mbase64[39;00m
[0;32m----> 4[0m   [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[43mbase64[49m[38;5;241;43m.[39;49m[43mstandard_b64decode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mLS0gQ

In [0]:
%sql
-- 2. Join the tables:
SELECT f.trip_id,
       f.VendorID,
       dt.datetime_id,
       pcd.passenger_count_id,
       tdd.trip_distance_id,
       rcd.rate_code_id,
       f.store_and_fwd_flag,
       pld.pickup_location_id,
       dld.dropoff_location_id,
       ptd.payment_type_id,
       f.fare_amount,
       f.extra,
       f.mta_tax,
       f.tip_amount,
       f.tolls_amount,
       f.improvement_surcharge,
       f.total_amount
FROM fact_table f
INNER JOIN datetime_dim dt ON f.trip_id = dt.datetime_id
INNER JOIN passenger_count_dim pcd ON f.trip_id = pcd.passenger_count_id
INNER JOIN trip_distance_dim tdd ON f.trip_id = tdd.trip_distance_id
INNER JOIN rate_code_dim rcd ON f.trip_id = rcd.rate_code_id
INNER JOIN payment_type_dim ptd ON f.trip_id = ptd.payment_type_id
INNER JOIN pickup_location_dim pld ON f.trip_id = pld.pickup_location_id
INNER JOIN dropoff_location_dim dld ON f.trip_id = dld.dropoff_location_id;

trip_id,VendorID,datetime_id,passenger_count_id,trip_distance_id,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type_id,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount


In [0]:
%sql
select * from fact_table


trip_id,VendorID,datetime_id,passenger_count_id,trip_distance_id,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type_id,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
