# 1. Access the lakehouse

![image-alt-text](https://github.com/ekote/azure-architect/blob/master/images/lakehouse_overview.gif?raw=true)


# 2. Lakehouse artifact overview

![image-alt-text](https://github.com/ekote/azure-architect/blob/master/images/2_30frames.gif?raw=true)

# 3. Get data from the lakehouse

![image-alt-text](https://github.com/ekote/azure-architect/blob/master/images/3.gif?raw=true)

`df = spark.sql("SELECT * FROM lab140lakehouse.nyc_taxi LIMIT 1000")` - This line of code uses the `spark.sql()` function to run an SQL query on a table called `nyc_taxi` located in the lakehouse `lab140lakehouse`. The query selects all columns `(*)` from the table and limits the result to the first 1000 rows with the `LIMIT 1000` clause. The result of the query is then stored in a PySpark DataFrame called `df`.

`display(df)` - the `display()` function is used to visualize the contents of a DataFrame in a tabular format. In this case, it visualizes the contents of the df DataFrame created in the previous line.

In [None]:
df = spark.sql("SELECT * FROM lab140lakehouse.nyc_taxi LIMIT 1000")

display(df)

The output of `df.printSchema()` displays the name of each column in the DataFrame, the data type of each column, and whether null values are allowed. This information can be useful for understanding the structure of the data in the DataFrame, and for performing operations on the data.

In [None]:
df.printSchema()

The code `df.show(5)` is used to display the first five rows of a DataFrame called df. This is a useful function when working with large datasets to quickly inspect the data and ensure that it has been loaded correctly. The number inside the parenthesis specifies the number of rows to display. In this case, the function call displays the first five rows of the DataFrame `df`. Each row is displayed as a separate line and the columns are separated by vertical bars.

In [None]:
df.show(5)

When working with data, one of the initial tasks is to read it into the environment for analysis. Once the data is loaded, basic analysis such as filtering, sorting, and aggregating can be performed. However, as the scale and complexity of the data increase, there is a need for more advanced data engineering scenarios such as data cleansing, transformation, and aggregation. 

Congratulations on completing the first part of the sample. Let us now move on to custom scenarios.

# 4. Data cleaning and transformation

In this scenario, the data engineer could perform some data cleaning and transformation tasks to prepare the data for downstream analysis. 

Objective: **Cleanse the data and filter out invalid records for further analysis.**

In this scenario, we aim to demonstrate how data engineers can perform data cleansing and filtering on a large dataset. We begin by loading the data from the source and then filter out records where the trip distance and fare amount are less than or equal to zero, which are invalid records.

Next, we cleanse the data by converting the `store_and_fwd_flag` column to a boolean type, and converting the `lpep_pickup_datetime` and `lpep_dropoff_datetime` columns to timestamp types. Finally, we write the cleansed data to the destination in the parquet format.

This scenario demonstrates the importance of data cleansing and filtering to ensure the data is accurate and valid before proceeding with further analysis.

In [None]:
from pyspark.sql.functions import col, when

# Load data from source
df = spark.read.load("Tables/nyc_taxi", header=True, inferSchema=True)
df_count = df.count()

# Remove invalid records
df = df.filter((col("tripDistance") > 0) & (col("fareAmount") > 0))
df_count_after_clearning = df.count()

number_of_deleted_records = df_count - df_count_after_clearning

print(f"Removed {number_of_deleted_records} records")

# # Cleanse data
df = df.withColumn("storeAndFwdFlag", when(col("storeAndFwdFlag") == "Y", True).otherwise(False))
df = df.withColumn("lpepPickupDatetime", col("lpepPickupDatetime").cast("timestamp"))
df = df.withColumn("lpepDropoffDatetime", col("lpepDropoffDatetime").cast("timestamp"))

# Display cleansed data to destination
display(df)

# Write cleansed data to destination
df.write.format("delta").mode("overwrite").saveAsTable("nyc_taxi_cleansed")

### Refresh lakehouse to see the results

![image-alt-text](https://github.com/ekote/azure-architect/blob/master/images/4.gif?raw=true)

# 5. Exploratory data analysis (EDA)

Exploratory data analysis (EDA) is a common scenario for data engineers. EDA is the process of analyzing and understanding data to gain insights, identify patterns, and develop hypotheses for further investigation. In data engineering, EDA is often done to identify data quality issues, anomalies, or other problems that need to be addressed before data can be used for analysis or modeling. EDA can also help data engineers to understand the relationships between different data sources and determine the best way to join or transform them.

`df.count()` is a Spark DataFrame API function that returns the number of rows in the DataFrame. It is a convenient way to quickly determine the size of the DataFrame without having to iterate over all the rows manually. The function is an action in Spark, meaning it triggers a computation that counts the number of rows in the DataFrame and returns the result. It is useful for getting a quick overview of the data size and checking if any rows are missing or dropped during data processing. However, it should be used with caution on large datasets, as it can be a costly operation that requires significant computational resources.

In [None]:
# Load data from source
df = spark.read.load("Tables/nyc_taxi_cleansed", header=True, inferSchema=True)

# Count the number of rows 
df.count()

`df.dtypes` is an attribute of a DataFrame object that returns a list of tuples containing the column names and their corresponding data types. The data types are represented using the Spark SQL DataType class, which is a set of classes for representing data types in Spark SQL.

In [None]:
# Display the data types of the columns.

df.dtypes

The code imports the col function from `pyspark.sql.functions` and uses it to select the `"vendorID"` column from the Spark DataFrame `df`. The `groupBy()` function is then called on the resulting column object to group the DataFrame by the distinct values in the `"vendorID"` column. The `count()` function is then applied to the resulting grouped DataFrame to calculate the number of records in each group. Finally, the `show()` function is used to display the resulting DataFrame on the console.

In [None]:
# Group the data by 'VendorID' and count the number of rows in each group. 

from pyspark.sql.functions import col

df.groupBy(col("vendorID")).count().show()

The code reads the Spark DataFrame `df` which contains information about NYC taxi trips. The code uses the `'min'` and `'max'` functions from PySpark to select the earliest and latest pickup dates respectively. These dates are stored in the variables `'oldest_day'` and `'latest_day'`. The `'collect'` function is then used to retrieve these values and they are printed to the console using the `'print'` function. The output displays the earliest and latest pickup dates in the dataset.

In [None]:
# Retrieve information about the earliest and latest pickup dates in the dataset.

from pyspark.sql.functions import min, max

oldest_day = df.select(min("lpepPickupDatetime")).collect()[0][0]
latest_day = df.select(max("lpepDropoffDatetime")).collect()[0][0]

print("Oldest pickup date: ", oldest_day)
print("Latest pickup date: ", latest_day)

This code uses the PySpark `date_format` function to group the `df` DataFrame by the year, month, and day of the `lpep_pickup_datetime` column, and then counts the number of occurrences for each date.

`date_format` is a PySpark SQL function used to format the date or timestamp column to the specified format. In this code, the format used is `yyyy-MM-dd`. The alias `pickup_date` is assigned to the formatted date column, and the DataFrame is grouped by this column using the `groupby()` method. The `count()` method is then applied to count the number of occurrences of each pickup_date. Finally, the result is displayed using the `show()` method.

In [None]:
from pyspark.sql.functions import date_format

# group by year, month and day of lpepPickupDatetime
df_grouped = df.groupby(date_format('lpepPickupDatetime', 'yyyy-MM-dd').alias('pickup_date')).count()

# show the result
df_grouped.show()

This code computes the minimum and maximum values of the fare_amount column in the Spark DataFrame df. It uses the `min()` and `max()` functions from the `pyspark.sql.functions` module to compute the minimum and maximum values, respectively. The `alias()` method is used to rename the resulting columns as `"min"` and `"max"`. Finally, the `show()` method is used to display the resulting DataFrame with two columns `"min"` and `"max"`, showing the minimum and maximum values of the `fare_amount column`.

In [None]:
# min max values of target feature "fare_amount"

df.select(min('fareAmount').alias('min'), max('fareAmount').alias('max')).show()

This code is performing descriptive statistical analysis on the `"fare_amount"` column of a Spark DataFrame named `"df"`. Specifically, it is using the `describe()` method of the DataFrame to compute summary statistics including `count`, `mean`, `standard deviation`, `minimum`, and `maximum`.

The result of `describe()` is then converted to a Pandas DataFrame using the `toPandas()` method. This allows the statistics to be displayed in a more user-friendly table format, which includes the same summary statistics along with the 25th, 50th, and 75th percentiles. The resulting table provides insights into the central tendency and dispersion of the `"fare_amount"` variable, and can be useful for understanding the distribution of the data and identifying potential outliers.

In [None]:
# General statistical characteristics of fare amount

df.select('fareAmount').describe().toPandas()

This code computes the approximate quantiles of the `'fare_amount'` column of the DataFrame `'df'` using the `'approxQuantile'` function from PySpark's SQL functions module. The function takes three arguments - the name of the column for which quantiles are to be computed, the list of quantile values to be returned, and a relative error value. In this case, the quantiles are 0.1, 0.25, 0.5, 0.75, and 0.9, and the relative error is set to 0.01. The function returns an array of approximate quantile values for the given column and quantile values.

In [None]:
# quantiles

df.select('fareAmount').approxQuantile("fareAmount",[0.1, 0.25, 0.5, 0.75, 0.9], 0.01)

This code is used to plot the distribution of fare_amount using matplotlib library in Python. The fare_amount data is first extracted from the Spark DataFrame using the select function along with the F.col() function to extract the fare_amount column. The resulting DataFrame is then converted to a Pandas DataFrame using the toPandas() function. The fare_amount data is then plotted as a histogram using the hist() function from matplotlib. The number of bins for the histogram is set to 50 using the bins parameter. Finally, the title and axis labels for the plot are set using the title(), xlabel(), and ylabel() functions, and the plot is displayed using the show() function.

In [None]:
import matplotlib.pyplot as plt
import pyspark.sql.functions as F

# Assuming your DataFrame is named `df`
fare_distribution = df.select(F.col('fareAmount')).toPandas()

# Plot histogram
plt.hist(fare_distribution, bins=50)
plt.title('Distribution of fareAmount')
plt.xlabel('fareAmount')
plt.ylabel('Frequency')
plt.show()

This code snippet demonstrates how to create a scatter plot using Matplotlib in Python. The code assumes that the Spark DataFrame df contains the columns fare_amount and trip_distance. First, the Spark DataFrame is converted to a Pandas DataFrame using the toPandas() function. Then, a scatter plot is created using ax.scatter() function. The x and y arguments of the scatter() function represent the variables to be plotted on the x- and y-axes, respectively. The alpha argument controls the transparency of the points in the scatter plot. The axis labels and title are set using the ax.set_xlabel(), ax.set_ylabel(), and ax.set_title() functions. Finally, the plot is displayed using the plt.show() function. This code can be used to visualize the correlation between fare amount and trip distance in the DataFrame.

In [None]:
import matplotlib.pyplot as plt

# assuming `df` is your Spark DataFrame containing the columns `fare_amount` and `trip_distance`

# convert Spark DataFrame to Pandas DataFrame
df_pd = df.select(['fareAmount', 'tripDistance']).toPandas()

# create scatter plot
fig, ax = plt.subplots()
ax.scatter(x=df_pd['tripDistance'], y=df_pd['fareAmount'], alpha=0.5)

# set axis labels and title
ax.set_xlabel('Trip Distance')
ax.set_ylabel('Fare Amount')
ax.set_title('Correlation between Fare Amount and Trip Distance')

# show the plot
plt.show()


# 6. Data aggregation and summarization

In this scenario, the data engineer could aggregate and summarize the data to provide insights into the overall trends and patterns in the dataset. For example, they could group the data by some categorical columns (such as VendorID or RatecodeID) and calculate some summary statistics for the numerical columns (such as average fare_amount or total trip_distance). This could involve using Spark's built-in aggregation functions (such as groupBy and agg) to perform these calculations.

The code calculates the average fare amount per month by grouping the DataFrame df by year and month of the lpep_pickup_datetime column. It uses the avg function from the pyspark.sql.functions module to calculate the average fare amount and aliases the resulting column as "average_fare". The resulting DataFrame average_fare_per_month is sorted by year and month and is displayed using the display function. Finally, the code saves the results to a new delta table named "average_fare_per_month" using the write function with "delta" format, and "overwrite" mode.

In [None]:
from pyspark.sql.functions import col, year, month, dayofmonth, avg

# Calculate average fare amount per month
average_fare_per_month = (
    df
    .groupBy(year("lpepPickupDatetime").alias("year"), month("lpepPickupDatetime").alias("month"))
    .agg(avg("fareAmount").alias("average_fare"))
    .orderBy("year", "month")
)
display(average_fare_per_month)

# Save the results to a new delta table
average_fare_per_month.write.format("delta").mode("overwrite").saveAsTable("average_fare_per_month")

The code calculates the average fare amount per day by grouping the DataFrame df by year and month and day of the lpep_pickup_datetime column. It uses the avg function from the pyspark.sql.functions module to calculate the average fare amount and aliases the resulting column as "average_fare". The resulting DataFrame average_fare_per_day is sorted by year and month and is displayed using the display function. Finally, the code saves the results to a new delta table named "average_fare_per_day" using the write function with "delta" format, and "overwrite" mode.

In [None]:
from pyspark.sql.functions import col, year, month, dayofmonth, avg

# Calculate average fare amount per day
average_fare_per_day = (
    df
    .groupBy(year("lpepPickupDatetime").alias("year"), month("lpepPickupDatetime").alias("month"), dayofmonth("lpepPickupDatetime").alias("day"))
    .agg(avg("fareAmount").alias("average_fare"))
    .orderBy("year", "month", "day")
)
display(average_fare_per_day)

# Save the results to a new delta table
average_fare_per_day.write.format("delta").mode("overwrite").saveAsTable("average_fare_per_day")

# 7. Custom libraries & advanced visualisation

Libraries provide reusable code that Apache Spark developers may want to include in their Spark application.

Each workspace comes with a pre-installed set of libraries available in the Spark run-time and available to be used immediately in the notebook or Spark job definition. We refer to these as built-in libraries.

Based on the user scenarios and specific needs, you can include other libraries. There are two types of libraries you may want to include:

- Feed library: Feed libraries are the ones that come from public sources or repositories. You can install Python feed libraries from PyPI and Conda by specifying the source in the Library Management portals. You can also use a Conda environment specification .yml file to install libraries.

- Custom library: Custom libraries are the code built by you or your organization. .whl, .jar and .tar.gz can be managed through Library Management portals. Note that .tar.gz is only supported for R language, please use .whl for Python custom libraries.

| **Library name** | **Workspace update** | **In-line installation** |
|---|---|---|
| **Python Feed (PyPI & Conda)** | Supported | Supported |
| **Python Custom (.whl)** | Supported | Supported |
| **R Feed (CRAN)** | Not Supported | Supported |
| **R custom (.tar.gz)** | Supported | Supported |
| **Jar** | Supported | Not Supported |

## Install library

In [None]:
pip install altair

## Create custom visualisation with new library

In [None]:
import altair as alt

df = spark.sql("SELECT * FROM lab140lakehouse.nyc_taxi_cleansed LIMIT 5000")

data = df.toPandas()

alt.Chart(data).mark_point().encode(
    x='tripDistance',
    y='fareAmount',
    color='paymentType:N',
    tooltip=['tripDistance', 'fareAmount', 'paymentType']
).interactive()

In [None]:
alt.Chart(data).mark_rect().encode(
    alt.X('tripDistance:Q', bin=True),
    alt.Y('fareAmount:Q', bin=True),
    color='count()',
    column='paymentType:N'
).interactive()


# 8. Shortcuts and final table 

Scenario explained here..



## Create shortcut to external ADLS Gen2
![image-alt-text](https://github.com/ekote/azure-architect/blob/master/images/5.gif?raw=true)



URL: `https://buildlab140ekot.dfs.core.windows.net/`

New Connection

Shared Access Signature

SAS: `?sv=2022-11-02&ss=bfqt&srt=sco&sp=rwlacupx&se=2023-06-01T10:49:05Z&st=2023-05-04T02:49:05Z&spr=https,http&sig=0e0%2BlbFhxx3lcyz79VF272PLEzd0UdyMD348iNvBvQQ%3D`

Name: `ExternalDataSideLoading`
path `/data`

## Load new data

![image-alt-text](https://github.com/ekote/azure-architect/blob/master/images/6.gif?raw=true)

In [None]:
discouts_df = spark.read.format("csv").option("header","true").load("Tables/ExternalDataSideLoading/Generated-NYC-Taxi-Green-Discounts.csv")

display(discouts_df)

## Unpivot sideloaded data

The import pandas as pd line imports the Pandas library and assigns it an alias pd.

Melt the discounts DataFrame: The pd.melt() function is used to convert the discouts_df PySpark DataFrame to a long format by converting date columns into rows. First, discouts_df.toPandas() is used to convert the PySpark DataFrame to a Pandas DataFrame. Then, pd.melt() takes the Pandas DataFrame, uses 'VendorID' as the identifier variable (id_vars), sets the 'date' as the variable name (var_name), and 'discount' as the value name (value_name). The melted DataFrame is stored in discouts_pd_df.

Convert the melted DataFrame to a PySpark DataFrame: The spark.createDataFrame() function is used to convert the melted Pandas DataFrame discouts_pd_df back to a PySpark DataFrame, which is stored in the discounts_spark_df variable.

In [None]:
import pandas as pd

# Melt discouts_df to long format
discouts_pd_df = pd.melt(discouts_df.toPandas(), id_vars=['VendorID'], var_name='date', value_name='discount')

discounts_spark_df = spark.createDataFrame(discouts_pd_df)

display(discounts_spark_df)

## Prepare data for join

In [None]:
from pyspark.sql.functions import to_date

nyc_taxi_df = spark.sql("SELECT * FROM lab140lakehouse.nyc_taxi")

nyc_taxi_df = nyc_taxi_df.withColumn("date", to_date("lpepPickupDatetime"))

display(nyc_taxi_df)

## Join two dataset and save result

In [None]:
# Create aliases for your DataFrames
df1_alias = nyc_taxi_df.alias("df1")
df2_alias = discounts_spark_df.alias("df2")

# Define the join condition using the aliases
join_condition = [col("df1.vendorID") == col("df2.VendorID"), col("df1.date") == col("df2.date")]

# Perform the join using the aliases
result_df = df1_alias.join(df2_alias, join_condition, how='inner')  # You can use other join types like 'left', 'right', 'outer', etc.

# Select only the desired columns
result_df = result_df.select("df1.vendorID", "df1.lpepPickupDatetime", "df2.discount")

display(result_df)

# Save the results to a new delta table
result_df.write.format("delta").mode("overwrite").saveAsTable("nyc_taxi_with_discounts")

![image-alt-text](https://github.com/ekote/azure-architect/blob/master/images/7.gif?raw=true)