###1. Connect to Azure Data Lake using Access Key

In [None]:
spark.conf.set(
                  "fs.azure.account.key.*** Data Lake Name ***.dfs.core.windows.net",  

                  "*** Data Lake Access Key ***"
              )

###2. List files in Data Lake

In [None]:
display(
          dbutils.fs.ls("abfss://*** Container Name ***@*** Data Lake Name ***.dfs.core.windows.net/*** Path ***/")
       )

###3. Read file from Data Lake

In [None]:
filePath = "abfss://*** Container Name ***@*** Data Lake Name ***.dfs.core.windows.net/Raw/TaxiRides.csv"

In [None]:
# Read TaxiRides csv file to create DataFrame
from pyspark.sql.functions import *

taxiRidesDF = (
                  spark    
                    .read    

                    .option("header", "true")

                    .option("inferSchema", "true")

                    .csv(filePath)
              )

# Show DataFrame content
display( taxiRidesDF )

###4. Analyze Data

In [None]:
taxiRidesAnalyzedDF = (
                            taxiRidesDF.describe
                            (
                                "PassengerCount",
                                "TripDistance"
                            )
                      )

display(taxiRidesAnalyzedDF)

###5. Apply Transformations

####5.A. Filter Data

In [None]:
taxiRidesDF = (
                  taxiRidesDF
    
                      .where("PassengerCount > 0")

                      .where("TripDistance > 0.0")
              )

####5.B. Select Limited Columns

In [None]:
taxiRidesDF = (
                   taxiRidesDF

                        # Select only limited columns
                        .select(
                                  col("RideID").alias("ID"),
                             
                                  "PickupTime",
                                  "DropTime",
                                  "PickupLocationId",
                                  "DropLocationId",
                                  "PassengerCount",
                                  "TripDistance",
                                  "RateCodeId",
                                  "PaymentType",
                                  "TotalAmount"
                              )
              )

taxiRidesDF.printSchema()

####5.C. Create Derived Columns - TripYear, TripMonth, TripDay

In [None]:
taxiRidesDF = (
                  taxiRidesDF
    
                        .withColumn("TripYear"  , year(col("PickupTime")))

                        .withColumn("TripMonth" , month(col("PickupTime")))

                        .withColumn("TripDay"   , dayofmonth(col("PickupTime")))
              )

taxiRidesDF.printSchema()

####5.D. Create Derived Column - TripType

In [None]:
taxiRidesDF = (
                  taxiRidesDF
    
                        .withColumn("TripType", when(
                                                      col("RateCodeId") == 6,
                                                        "SharedTrip"
                                                    )
                                                .otherwise("SoloTrip")
                                   )
              )

taxiRidesDF.printSchema()

###6. Save Data in Parquet Format to Data Lake

In [None]:
(
    taxiRidesDF
            .write
    
            .mode("overwrite")
    
            .parquet("abfss://*** Container Name ***@*** Data Lake Name ***.dfs.core.windows.net/Output/TaxiRides.parquet")
)