In [1]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
from pyspark import SparkConf, SparkContext

In [2]:
# configure Spark
conf = SparkConf().setAppName("MyApp") \
                  .setMaster("local[*]") \
                  .set("spark.executor.memory", "4g") \
                  .set("spark.driver.memory", "4g") \
                  .set("spark.executor.cores", "2")
# sc = SparkContext(conf=conf)

In [3]:
# create Spark session
spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()

In [4]:
spark

In [5]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [6]:
# Read the uber file into a pyspark dataframe
uber1 = spark.read.csv("C:\\Users\\USER\\TransTech Project\\uber-raw-data-apr14.csv", header=True)

In [7]:
# Read the uber file into a pyspark dataframe
base = spark.read.csv("C:\\Users\\USER\\TransTech Project\\TLC Affiliated Base.csv", header=True)

In [8]:
# Change the names of the columns to align with the ones in other datasets
columns_mapping = {"Date/Time": "Date_time", "Lat": "Latitude", "Lon": "Longitude"}

for old_name, new_name in columns_mapping.items():
    uber1 = uber1.withColumnRenamed(old_name, new_name)

In [9]:
# split the date_time columns
uber1 = uber1.withColumn("DATE_TIME", split(uber1["DATE_TIME"], " "))
uber1 = uber1.withColumn("Date", uber1["DATE_TIME"].getItem(0))
uber1 = uber1.withColumn("Time", uber1["DATE_TIME"].getItem(1))

In [10]:
# drop irrelevant columns
uber1 = uber1.drop("DATE_TIME")

In [11]:
# convert data type of DATE column to Date datatype
uber1 = uber1.withColumn("Date", to_date(uber1["Date"], "MM/dd/yyyy"))

In [12]:
# convert the "time_column" from string to 24 hour time format
uber1 = uber1.withColumn("Time", col("Time"))

In [13]:
# cast longitude and latitude columns to double_precision
uber1 = uber1.withColumn("Latitude", col("Latitude").cast("double")) \
                           .withColumn("Longitude", col("Longitude").cast("double"))

In [14]:
# Add a new column to the dataset
uber1 = uber1.withColumn("Company", lit("Uber"))

In [15]:
# join the uber dataframe to the base dataframe
uber1 = uber1.join(base, uber1.Base== base.Base_Code)

In [16]:
# rearrange the columns
uber1 = uber1.selectExpr("Date", "Time", "Company", "Base_Code", "Base_Name", "Base_Region", "Latitude", "Longitude")

In [17]:
# remove blank cells
uber1 = uber1.dropna()

In [18]:
# drop duplicates
uber1 = uber1.dropDuplicates()

In [19]:
# Cache the table
uber1.cache()

DataFrame[Date: date, Time: string, Company: string, Base_Code: string, Base_Name: string, Base_Region: string, Latitude: double, Longitude: double]

In [20]:
# write to postgresql server database
uber1.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://localhost:5432/postgis_33_sample") \
  .option("dbtable", "uber2014") \
  .option("user", "postgres") \
  .option("password", "*********") \
  .option("batchsize", 1000) \
  .mode("append") \
  .save()