## # **DATA PROCESSING/TRANSFORMATION FRAMEWORK**

In [None]:
from pyspark.sql.functions import *

# EXTRACT
READ DATA AND PUT IT IN DATAFRAME

In [None]:

storage_account_name = "*****************"
storage_account_access_key = "****************"
file_location = "**************"
file_type = "*************"



In [None]:
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

In [None]:
def read_csv(file_type, file_location):
  #read csv and load as dataframe
  df = spark.read.format(file_type).option("header", "true").option("inferSchema", "true").load(file_location)
  
  return df

df = read_csv(file_type, file_location)
display(df.head(5))

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


# inspect data using pyspark
- Print schema
- Show first few rows
- Summary statistics
- Count total number of rows

In [None]:
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [None]:
print(df.columns)

['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn']


In [None]:
#summary stats
display(df.describe())

summary,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
count,7043,7043,7043.0,7043,7043,7043.0,7043,7043,7043,7043,7043,7043,7043,7043,7043,7043,7043,7043,7043.0,7043.0,7043
mean,,,0.1621468124378816,,,32.37114865824223,,,,,,,,,,,,,64.76169246059922,2283.3004408418697,
stddev,,,0.3686116056100135,,,24.55948102309444,,,,,,,,,,,,,30.09004709767848,2266.771361883145,
min,0002-ORFBO,Female,0.0,No,No,0.0,No,No,DSL,No,No,No,No,No,No,Month-to-month,No,Bank transfer (automatic),18.25,,No
max,9995-HOTOH,Male,1.0,Yes,Yes,72.0,Yes,Yes,No,Yes,Yes,Yes,Yes,Yes,Yes,Two year,Yes,Mailed check,118.75,999.9,Yes


In [None]:
print("Total number of rows:")
row_count = df.count()
print(row_count)

Total number of rows:
7043


# data validation using pyspark
- check if dataframe is empty
- primary key check
- check for nulls

In [None]:
def data_validation(df):
  #
  if df.rdd.isEmpty():
      print("DataFrame is empty")
  else:
      print("DataFrame is not empty")


  #primary key check
  if df.select("customerID").distinct().count() == df.count():
      print("ID column is unique")
  else:
      raise Exception("Primary key check is violated.")

  # Check for nulls
  null_found = df.select([col(c).isNull().alias(c) for c in df.columns])\
                .rdd.map(lambda row: any(row))\
                .reduce(lambda x, y: x or y)
  if null_found:
    raise Exception("Null values found")
  else:
    print("No null values found")


data_validation(df)



DataFrame is not empty
ID column is unique
No null values found


# data transformation using pyspark
If applicable/necessary
- filter
- aggregate
- split text
- cast data to desired data type
- date and time manipulation
- reaarange columns
- drop null values (if any)
- Remove duplicates(if any)
- Other business logics
- ERROR HANDLING

In [None]:
from pyspark.sql.functions import avg, col, lit

def transform_data(staging_table):
    df = staging_table
    # Calculate the overall average of TotalCharges
    average_total_charges = df.select(avg("TotalCharges")).first()[0]

    # Add a new column with the overall average
    df = df.withColumn("average", lit(average_total_charges))
    df_filtered = df.where((col("PhoneService") == "No") & (col("MonthlyCharges") >= 35.0)).orderBy(col("MonthlyCharges").asc())

    transformed_data = df_filtered.select('customerID', 'gender', 'PhoneService', 'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'average')
    return transformed_data

In [None]:
row_count = df.count()
print(f"Total number of rows: {row_count}")

Total number of rows: 7043


In [None]:
transformed_data.columns


['customerID',
 'gender',
 'PhoneService',
 'PaymentMethod',
 'MonthlyCharges',
 'TotalCharges',
 'average']

# Load it to azure SQL

In [None]:
# Construct JDBC URL for connecting to SQL Server
server_name = "**********"
database_name = "***********"
user_name = "*********"
password = "*************"

jdbcUrl = f"jdbc:sqlserver://{server_name}.database.windows.net:1433;" \
          f"database={database_name};" \
          f"user={user_name}@" + server_name + ";" \
          f"password={password};"

In [None]:
def load_to_azSQL(transformed_data):
    df = transformed_data
    try:
        df.write \
            .format("jdbc") \
            .option("url", jdbcUrl) \
            .option("dbtable", "telco_sample_dataset") \
            .option("user", user_name) \
            .option("password", password) \
            .mode("overwrite") \
            .save()
        display("DataFrame successfully written to Azure SQL Database.")
    except Exception as e:
        display(f"An error occurred: {e}")

## ETL Pipeline Execution: Read CSV, Transform Data, Load to Azure SQL

In [None]:
staging_table = read_csv(file_type, file_location)
transformed_data = transform_data(staging_table)
load_df = load_to_azSQL(transformed_data)

'DataFrame successfully written to Azure SQL Database.'


This table will persist across cluster restarts and allow various users across different notebooks to query this data.