# SQL Sat MN  Lab#2 - Code notebooks

In [None]:
# If notebook run previously - drop the existing table for reload
spark.sql("DROP TABLE IF EXISTS ppp_loan_details")

In [None]:
# Load file path listing to a dictionary for the next step. We're only using half the files for time
file_dict_1 = {
        1 : "Files/sqlsat-labs/PPP/public_up_to_150k_1_230630.csv",
        2 : "Files/sqlsat-labs/PPP/public_up_to_150k_2_230630.csv",
        3 : "Files/sqlsat-labs/PPP/public_up_to_150k_3_230630.csv",
        4 : "Files/sqlsat-labs/PPP/public_up_to_150k_4_230630.csv",
        5 : "Files/sqlsat-labs/PPP/public_up_to_150k_5_230630.csv",
        6 : "Files/sqlsat-labs/PPP/public_up_to_150k_6_230630.csv",
        7 : "Files/sqlsat-labs/PPP/public_up_to_150k_7_230630.csv",
        8 : "Files/sqlsat-labs/PPP/public_150k_plus_230630.csv"
}
print(file_dict_1)

In [None]:
from pyspark.sql.types import LongType, DecimalType, DateType
from pyspark.sql.functions import lit, col, concat

# Init var
first_file = True

# Loop through our file dictionary. Read each file, load to df, select reduced column list, write to delta table
for key, v in file_dict_1.items():
    print(f"Key: {key}, Value: {v}")

    # Note that the file paths are treated differently if you're in DE persona vs. PBI persona
    # df = spark.read.format("csv").option("header","true").option("inferschema","true").load("abfss://Demos@onelake.dfs.fabric.microsoft.com/PPP.Lakehouse/"+v)
    df = spark.read.format("csv").option("header","true").option("inferschema","true").load(v) 

    # Change the table write mode
    if first_file == True :
        mode = "overwrite"
        first_file = False #changing the flag to false for next run
        df.printSchema()
    else:
        mode = "append"

    df = df.withColumn("LoanNumber", df.LoanNumber) \
        .withColumn("DateApproved", df.DateApproved) \
        .withColumn("BorrowerName", df.BorrowerName) \
        .withColumn("BorrowerCity", df.BorrowerCity) \
        .withColumn("BorrowerState", df.BorrowerState) \
        .withColumn("BorrowerZipCode", df.BorrowerZip) \
        .withColumn("ApprovedAmount", df.CurrentApprovalAmount.cast(DecimalType(10,2))) \
        .withColumn("FranchiseName", df.FranchiseName) \
        .withColumn("LenderName", df.ServicingLenderName) \
        .withColumn("RuralorUrban", df.RuralUrbanIndicator) \
        .withColumn("BusinessAge", df.BusinessAgeDescription) \
        .withColumn("EmployeeCount", df.JobsReported.cast(LongType())) \
        .withColumn("NAICSIndustryCode", df.NAICSCode) \
        .withColumn("BusinessTypeDesc", df.BusinessType)
    
    # Select a smaller subset of the columns
    # df_selection = df.select("LoanNumber","DateApproved","BorrowerName","BorrowerCity","BorrowerState","BorrowerZipCode","ApprovedAmount","FranchiseName","LenderName","RuralorUrban","BusinessAge","EmployeeCount","NAICSIndustryCode","BusinessTypeDesc")
    df_selection = df.select("LoanNumber","DateApproved","BorrowerName","BorrowerCity","BorrowerState","BorrowerZipCode","ApprovedAmount","FranchiseName","LenderName","RuralorUrban","BusinessAge","EmployeeCount","BusinessTypeDesc")

    # display(df)
    display(df_selection)

    print(f'Writing {key} data to table - {df.count()} records')  
    # Write to delta table
    df_selection.write.mode(mode).format('delta').save(f"Tables/ppp_loan_details")


In [None]:
# Read delta table back into a spark df and print a row count
df_table = spark.read.table("ppp_loan_details")
print(df_table.count())

In [None]:
# Display the spark df
display(df_table)

In [None]:
%%sql
--Switch to SQL and query a count
SELECT COUNT(*) FROM ppp_loan_details

In [None]:
%%sql
--Query sum
SELECT 
    BorrowerState, 
    SUM(ApprovedAmount) AS TotalApprovedAmount 
FROM ppp_loan_details
GROUP BY BorrowerState
ORDER BY TotalApprovedAmount DESC