In [None]:
%%pyspark
blob_account_name = "dssdemomtbanksa"
blob_container_name = "taxidata"
from pyspark.sql import SparkSession

sc = SparkSession.builder.getOrCreate()
token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
blob_sas_token = token_library.getConnectionString("ConnBlob")

spark.conf.set(
    'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
    blob_sas_token)
df = spark.read.load('wasbs://taxidata@dssdemomtbanksa.blob.core.windows.net/GreenTaxis_201911.csv', format='csv'
## If header exists uncomment line below
, header=True
,inferSchema=True
)
display(df.limit(10))

In [None]:
df.printSchema

In [None]:
df.columns

In [None]:
display(df.describe())

In [None]:
display(df.describe("trip_distance","passenger_count"))

In [None]:
display(df,summary=True)

In [3]:
print ("Before filter = ", df.count())
df=df.filter("trip_distance >0.0")
print ("After filter = ",  df.count())                   


StatementMeta(Sparkpool, 7, 2, Finished, Available)

Before filter =  449500
After filter =  421018

In [4]:
print ("Before filter = ", df.count())
df=df.where("passanger_count>0.0" and "passenger_count<=5")
print ("After filter = ",  df.count())     

StatementMeta(Sparkpool, 7, 3, Finished, Available)

Before filter =  421018
After filter =  344149

In [5]:
df=df.filter(
    (df.lpep_pickup_datetime >='2019-11-01' ) & (df.lpep_pickup_datetime <'2019-12-01'))

StatementMeta(Sparkpool, 7, 4, Finished, Available)

In [None]:
display(df.limit(100))

In [None]:
df.columns

In [6]:
df=df.select(
                "lpep_pickup_datetime",
                "lpep_dropoff_datetime", 
                "RatecodeID",
 "PULocationID",
 "DOLocationID",
 "passenger_count",
 "trip_distance",
 "total_amount", 
 "payment_type" 
)

StatementMeta(Sparkpool, 7, 5, Finished, Available)

In [7]:
df=(df
    .withColumnRenamed("lpep_pickup_datetime","PickupTime")
    .withColumnRenamed("lpep_dropoff_datetime","DropTime")
    .withColumnRenamed( "PULocationID","PickupLocationId")
    .withColumnRenamed("DOLocationID","DropLocationId")
    .withColumnRenamed("passenger_count","PassengerCount")
    .withColumnRenamed("trip_distance","TripDistance")
    .withColumnRenamed("total_amount","TotalAmount")
    .withColumnRenamed("payment_type","PaymentType")
)

StatementMeta(Sparkpool, 7, 6, Finished, Available)

In [8]:
from pyspark.sql.functions import *
df=(df
     .withColumn("TripYear",year("PickupTime"))
     .withColumn("TripMonth", month("PickupTime"))
     .withColumn("TripDay", dayofmonth("PickupTime")))


StatementMeta(Sparkpool, 7, 7, Finished, Available)

In [44]:
df=df.\
  withColumn("TripType", 
    when(df.RatecodeID == 6,"SharedTrip")\
    .otherwise("SoloTrip")  )\
    .drop("RatecodeID")                             


StatementMeta(Sparkpool, 5, 9, Finished, Available)

In [9]:
df=df.withColumn("TripType", 
    when(df.RatecodeID == 6,"SharedTrip")\
    .otherwise("SoloTrip")  )\
    .drop("RatecodeID")                             


StatementMeta(Sparkpool, 7, 8, Finished, Available)

In [46]:
%%sql
Create Database TaxiDatabase

StatementMeta(Sparkpool, 5, 11, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

## Write Transformed Data in Data Lake

In [47]:
(df
    .write
    .partitionBy("TripYear","TripMonth","TripDay")
    .mode("overwrite")
    .parquet("abfss://taxidata@dssdatalake.dfs.core.windows.net/Facts/GreenTaxidata.parquet")
)

StatementMeta(Sparkpool, 5, 12, Finished, Available)

## Write Transformed Data as managed Table

In [10]:
(df
    .write
    .partitionBy("TripYear","TripMonth","TripDay")
    .mode("overwrite")
    .saveAsTable("TaxiDatabase.GreenTaxiManaged")
)

StatementMeta(Sparkpool, 7, 9, Finished, Available)

## Write Transformed Data as unmanaged Table

In [11]:
(df
    .write
    .partitionBy("TripYear","TripMonth","TripDay")
    .mode("overwrite")
    .option("path","abfss://taxidata@dssdatalake.dfs.core.windows.net/Facts/GreenTaxidata1.parquet")
    .saveAsTable("TaxiDatabase.GreenTaxiUnManaged")
)

StatementMeta(Sparkpool, 7, 10, Finished, Available)