In [5]:
import pandas as pd
import numpy as np
import datetime
import torch
from datetime import date, datetime, timedelta
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev, variance, when
import pyspark.sql.functions as F
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, MinMaxScaler,  VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=e360d6f072578c9cb7dcfad9419b713f9dcc74fa4f97e310733f92e1a9d2dc1b
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


### IMPORT DATA

In [None]:
# IMPORT DATA
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv('car_data.csv', header=True, inferSchema=True)
df.show(3)

+-------+------+---+------------+---------+
|User ID|Gender|Age|AnnualSalary|Purchased|
+-------+------+---+------------+---------+
|    385|  Male| 35|       20000|        0|
|    681|  Male| 40|       43500|        0|
|    353|  Male| 49|       74000|        0|
+-------+------+---+------------+---------+
only showing top 3 rows



In [None]:
df = df.withColumnRenamed('User ID', 'User_ID')

In [None]:
print('Num. rows:', df.count())
print('Num. Columns:', len(df.columns))

Num. rows: 1000
Num. Columns: 5


In [None]:
from datetime import datetime, timedelta
run_date = '2023-06-12'
str(datetime.strptime(run_date,"%Y-%m-%d") - timedelta(days=90))[:10]

'2023-03-14'

In [None]:
from datetime import datetime, timedelta

model_run_date = '06-22-2023'
[str(datetime.strptime(model_run_date, "%m-%d-%Y") - timedelta(days=30*x))[:10] for x in [0, 1, 2, 3, 4, 5, 6]]


['2023-06-22',
 '2023-05-23',
 '2023-04-23',
 '2023-03-24',
 '2023-02-22',
 '2023-01-23',
 '2022-12-24']

In [None]:
model_run_date = '2023-06-22'
forecast_days = 90
[str(datetime.strptime(model_run_date,"%Y-%m-%d") + timedelta(days=forecast_days*x))[:10] for x in [0, 1]]

['2023-06-22', '2023-09-20']

### SQL COMMANDS

In [None]:
#understand Parquet files + udf

In [None]:
# Register the DataFrame as a temporary table/view
df.createOrReplaceTempView("car_data")
above_30 = spark.sql('SELECT * FROM car_data cd where cd.Age >= 35')
above_30.show()

+-------+------+---+------------+---------+
|User_ID|Gender|Age|AnnualSalary|Purchased|
+-------+------+---+------------+---------+
|    385|  Male| 35|       20000|        0|
|    681|  Male| 40|       43500|        0|
|    353|  Male| 49|       74000|        0|
|    895|  Male| 40|      107500|        1|
|    846|Female| 47|       33500|        1|
|    219|Female| 46|      132500|        1|
|    588|  Male| 42|       64000|        0|
|    465|  Male| 41|       52000|        0|
|    686|  Male| 42|       80000|        0|
|    408|  Male| 47|       23000|        1|
|    118|Female| 42|      108000|        1|
|     90|  Male| 35|       75000|        0|
|    372|  Male| 35|       53000|        0|
|    926|  Male| 46|       79000|        1|
|     94|Female| 39|      134000|        1|
|    338|Female| 39|       51500|        0|
|    134|Female| 49|       39000|        1|
|    821|  Male| 54|       25500|        1|
|    294|Female| 41|       61500|        0|
|     20|  Male| 40|      107000

### Partitioning

In [None]:
df.show(10)

+-------+------+---+------------+---------+
|User_ID|Gender|Age|AnnualSalary|Purchased|
+-------+------+---+------------+---------+
|    385|  Male| 35|       20000|        0|
|    681|  Male| 40|       43500|        0|
|    353|  Male| 49|       74000|        0|
|    895|  Male| 40|      107500|        1|
|    661|  Male| 25|       79000|        0|
|    846|Female| 47|       33500|        1|
|    219|Female| 46|      132500|        1|
|    588|  Male| 42|       64000|        0|
|     85|Female| 30|       84500|        0|
|    465|  Male| 41|       52000|        0|
+-------+------+---+------------+---------+
only showing top 10 rows



In [None]:
spark.sql('SELECT User_ID, Gender, Age, AVG(AnnualSalary) OVER (PARTITION BY Gender) AS AvgSalary_Gender\
 FROM car_data').show(5)

+-------+------+---+-----------------+
|User_ID|Gender|Age| AvgSalary_Gender|
+-------+------+---+-----------------+
|    846|Female| 47|74802.32558139534|
|    219|Female| 46|74802.32558139534|
|     85|Female| 30|74802.32558139534|
|    790|Female| 32|74802.32558139534|
|    116|Female| 27|74802.32558139534|
+-------+------+---+-----------------+
only showing top 5 rows



In [None]:
spark.sql('SELECT User_ID, Gender, COUNT(Gender) OVER (PARTITION BY Gender) AS TotalGender \
FROM car_data').show(5)

+-------+------+-----------+
|User_ID|Gender|TotalGender|
+-------+------+-----------+
|    846|Female|        516|
|    219|Female|        516|
|     85|Female|        516|
|    790|Female|        516|
|    116|Female|        516|
+-------+------+-----------+
only showing top 5 rows



### Partition by + Lag

This query calculates time intervals, from each email {open,click,delivery} event by user.

```sql
SELECT email_all.upm_id upm_id,
       datediff(first_open_ts_utc, lag(first_open_ts_utc) OVER (PARTITION BY upm_id ORDER BY first_open_ts_utc)) AS open_interval,
       datediff(first_click_ts_utc, lag(first_click_ts_utc) OVER (PARTITION BY upm_id ORDER BY first_click_ts_utc)) AS click_interval,
       datediff(first_delivery_ts_utc, lag(first_delivery_ts_utc) OVER (PARTITION BY upm_id ORDER BY first_delivery_ts_utc)) AS delivery_interval
FROM comms_atc.combined_campaign_email_data email_all
JOIN campaign_metadata meta ON email_all.comm_id = meta.comm_id
WHERE substring(first_del_dt, 1, 10) >= '{lookback_start_date}' AND
      substring(first_del_dt, 1, 10) < '{lookback_end_date}'


In [20]:
!pip install openpyxl

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [33]:
import pandas as pd

# Read the Excel file using pandas
df_bank_pd = pd.read_excel("bank.xlsx")

# Convert the DataFrame to a CSV file
df_bank_pd.to_csv("bank.csv", index=False)

# Read the CSV file in Spark
df_bank = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("bank.csv")

# Display the DataFrame
df_bank.show()

+-------------+----------+--------------------+-------+----------+--------------+-----------+-----------+---+
|   Account No|      DATE| TRANSACTION DETAILS|CHQ.NO.|VALUE DATE|WITHDRAWAL AMT|DEPOSIT AMT|BALANCE AMT|  .|
+-------------+----------+--------------------+-------+----------+--------------+-----------+-----------+---+
|409000611074'|2017-06-29|TRF FROM  Indiafo...|   null|2017-06-29|          null|  1000000.0|  1000000.0|  .|
|409000611074'|2017-07-05|TRF FROM  Indiafo...|   null|2017-07-05|          null|  1000000.0|  2000000.0|  .|
|409000611074'|2017-07-18|FDRL/INTERNAL FUN...|   null|2017-07-18|          null|   500000.0|  2500000.0|  .|
|409000611074'|2017-08-01|TRF FRM  Indiafor...|   null|2017-08-01|          null|  3000000.0|  5500000.0|  .|
|409000611074'|2017-08-16|FDRL/INTERNAL FUN...|   null|2017-08-16|          null|   500000.0|  6000000.0|  .|
|409000611074'|2017-08-16|FDRL/INTERNAL FUN...|   null|2017-08-16|          null|   500000.0|  6500000.0|  .|
|409000611

In [34]:
from pyspark.sql.functions import regexp_replace #replace ' with ''

df_bank = df_bank.withColumn('Account No', regexp_replace(df_bank['Account No'], "'", ''))
#df_bank = df_bank.withColumn('Account No', F.col('Account No').cast('integer')) #convert to int

In [35]:
df_bank = df_bank.withColumnRenamed('Account No', 'Account_No').withColumnRenamed('DEPOSIT AMT', 'Deposit_amt')\
.withColumnRenamed('BALANCE AMT', 'Balance_amt')
df_bank.printSchema()

root
 |-- Account_No: string (nullable = true)
 |-- DATE: date (nullable = true)
 |-- TRANSACTION DETAILS: string (nullable = true)
 |-- CHQ.NO.: double (nullable = true)
 |-- VALUE DATE: date (nullable = true)
 |-- WITHDRAWAL AMT: double (nullable = true)
 |-- Deposit_amt: double (nullable = true)
 |-- Balance_amt: double (nullable = true)
 |-- .: string (nullable = true)



In [41]:
df_bank.select('Account_No').distinct().show(100) #Unique customers/accounts

+------------+
|  Account_No|
+------------+
|409000493210|
|409000438611|
|409000611074|
|     1196711|
|409000425051|
|409000438620|
|409000493201|
|409000405747|
|     1196428|
|409000362497|
+------------+



In [62]:
spark.sql('''
    SELECT Account_No, Date, Deposit_amt,
        AVG(Deposit_amt) OVER (PARTITION BY Account_No) AS AVG_Deposit_Account
    FROM bank_data
''').show(10)

+----------+----------+-----------+-------------------+
|Account_No|      Date|Deposit_amt|AVG_Deposit_Account|
+----------+----------+-----------+-------------------+
|   1196428|2015-01-01|  1200000.0|  2130827.330993709|
|   1196428|2015-01-01|   800000.0|  2130827.330993709|
|   1196428|2015-01-02|    15000.0|  2130827.330993709|
|   1196428|2015-01-02|    25000.0|  2130827.330993709|
|   1196428|2015-01-02|    25000.0|  2130827.330993709|
|   1196428|2015-01-02|    25000.0|  2130827.330993709|
|   1196428|2015-01-02|       null|  2130827.330993709|
|   1196428|2015-01-02|       null|  2130827.330993709|
|   1196428|2015-01-02|  1500000.0|  2130827.330993709|
|   1196428|2015-01-02|   172620.0|  2130827.330993709|
+----------+----------+-----------+-------------------+
only showing top 10 rows



In [63]:
df_bank.createOrReplaceTempView('bank_data')
spark.sql('''
    SELECT Account_No, Deposit_amt,
        AVG(Deposit_amt) OVER (PARTITION BY Account_No) AS AVG_Deposit_Account
    FROM bank_data
''').tail(10)#.show()

[Row(Account_No='409000611074', Deposit_amt=None, AVG_Deposit_Account=463046.4095238095),
 Row(Account_No='409000611074', Deposit_amt=300000.0, AVG_Deposit_Account=463046.4095238095),
 Row(Account_No='409000611074', Deposit_amt=None, AVG_Deposit_Account=463046.4095238095),
 Row(Account_No='409000611074', Deposit_amt=None, AVG_Deposit_Account=463046.4095238095),
 Row(Account_No='409000611074', Deposit_amt=None, AVG_Deposit_Account=463046.4095238095),
 Row(Account_No='409000611074', Deposit_amt=None, AVG_Deposit_Account=463046.4095238095),
 Row(Account_No='409000611074', Deposit_amt=300000.0, AVG_Deposit_Account=463046.4095238095),
 Row(Account_No='409000611074', Deposit_amt=None, AVG_Deposit_Account=463046.4095238095),
 Row(Account_No='409000611074', Deposit_amt=None, AVG_Deposit_Account=463046.4095238095),
 Row(Account_No='409000611074', Deposit_amt=None, AVG_Deposit_Account=463046.4095238095)]

In [57]:
#Use datediff()  --> DATE is cast in date datatype
current_date = '2017-08-18'
spark.sql('''
SELECT Account_No, DATE, Deposit_amt,
      datediff("{}", DATE) AS time_ago

FROM bank_data
          '''.format(current_date)).show()

+------------+----------+-----------+--------+
|  Account_No|      DATE|Deposit_amt|time_ago|
+------------+----------+-----------+--------+
|409000611074|2017-06-29|  1000000.0|      50|
|409000611074|2017-07-05|  1000000.0|      44|
|409000611074|2017-07-18|   500000.0|      31|
|409000611074|2017-08-01|  3000000.0|      17|
|409000611074|2017-08-16|   500000.0|       2|
|409000611074|2017-08-16|   500000.0|       2|
|409000611074|2017-08-16|   500000.0|       2|
|409000611074|2017-08-16|   500000.0|       2|
|409000611074|2017-08-16|   500000.0|       2|
|409000611074|2017-08-16|   500000.0|       2|
|409000611074|2017-08-16|       null|       2|
|409000611074|2017-08-16|       null|       2|
|409000611074|2017-08-16|       null|       2|
|409000611074|2017-08-16|       null|       2|
|409000611074|2017-08-16|       null|       2|
|409000611074|2017-08-16|       null|       2|
|409000611074|2017-08-16|       null|       2|
|409000611074|2017-08-16|       null|       2|
|409000611074

# GOOD QUERY 😁

In [68]:
#Partition + Lag to find DEPOSIT INTERVAL
spark.sql('''
SELECT Account_No, DATE, Deposit_amt,
  datediff(DATE, lag(DATE) OVER (PARTITION BY Account_No ORDER BY DATE)) AS Deposit_interval

FROM bank_data
          ''').show(150)

+----------+----------+-----------+----------------+
|Account_No|      DATE|Deposit_amt|Deposit_interval|
+----------+----------+-----------+----------------+
|   1196428|2015-01-01|  1200000.0|            null|
|   1196428|2015-01-01|   800000.0|               0|
|   1196428|2015-01-02|    15000.0|               1|
|   1196428|2015-01-02|    25000.0|               0|
|   1196428|2015-01-02|    25000.0|               0|
|   1196428|2015-01-02|    25000.0|               0|
|   1196428|2015-01-02|       null|               0|
|   1196428|2015-01-02|       null|               0|
|   1196428|2015-01-02|  1500000.0|               0|
|   1196428|2015-01-02|   172620.0|               0|
|   1196428|2015-01-02|   700000.0|               0|
|   1196428|2015-01-03|       null|               1|
|   1196428|2015-01-03|  2200000.0|               0|
|   1196428|2015-01-03|    48160.0|               0|
|   1196428|2015-01-03|   700000.0|               0|
|   1196428|2015-01-05|       null|           

### EDA

In [None]:
df.printSchema()

root
 |-- User ID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- AnnualSalary: integer (nullable = true)
 |-- Purchased: integer (nullable = true)



In [None]:
#null val count
for col_ in df.columns:
  print(f'{col_} N/A:', df.filter(col(col_).isNull()).count())

User ID N/A: 0
Gender N/A: 0
Age N/A: 0
AnnualSalary N/A: 0
Purchased N/A: 0


In [None]:
df.groupBy('Gender').count().show() #unique vals in gender

+------+-----+
|Gender|count|
+------+-----+
|Female|  516|
|  Male|  484|
+------+-----+



In [None]:
#Find correlation with outcome var
def corr_(var):
  for col_ in df.columns:
    if col_ != var: continue
    for col2 in df.columns:
      if col_==col2 or df.select(F.col(col_)).dtypes[0][1] == 'string' or df.select(F.col(col2)).dtypes[0][1] == 'string': continue
      else:print('Correlation ({}, {}):'.format(col_, col2), df.select(F.corr(col_, col2)).first()[0])

corr_('Purchased')

Correlation (Purchased, User ID): 0.015910931566033848
Correlation (Purchased, Age): 0.6160363973749042
Correlation (Purchased, AnnualSalary): 0.3649744312828344


In [None]:
df.describe().toPandas()

Unnamed: 0,summary,User ID,Gender,Age,AnnualSalary,Purchased
0,count,1000.0,1000,1000.0,1000.0,1000.0
1,mean,500.5,,40.106,72689.0,0.402
2,stddev,288.8194360957494,,10.707072681429098,34488.34186685011,0.490547282773676
3,min,1.0,Female,18.0,15000.0,0.0
4,max,1000.0,Male,63.0,152500.0,1.0


#Logit Modeling

In [None]:
df.printSchema()

root
 |-- User ID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- AnnualSalary: integer (nullable = true)
 |-- Purchased: integer (nullable = true)



In [None]:
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv('car_data.csv', header=True, inferSchema=True)
df.show(3)

# Categorize Columns
indexer = StringIndexer(inputCol='Gender', outputCol='Gender_encoded').fit(df)
df = indexer.transform(df).select([x if x != 'Gender' else 'Gender_encoded' for x in df.columns])

# Random .8/.2 split
train, test = df.randomSplit([.8, .2], seed=0)

# Vectorize
v_assembler = VectorAssembler(inputCols=[col for col in df.columns if col != 'Purchased'],
                              outputCol='features')
df = v_assembler.transform(df).select('features', 'Purchased')

# Scale
scaler = MinMaxScaler(inputCol='features', outputCol='features_scaled').fit(df)
df = scaler.transform(df).select('features_scaled', 'Purchased')

df.show()

# Fit and Train Model
lr = LogisticRegression(featuresCol='features_scaled', labelCol= 'Purchased')
lr.setParams(regParam=0.01, elasticNetParam=0.5)

pipeline = Pipeline(stages=[indexer, v_assembler, scaler, lr])
model = pipeline.fit(train)


+-------+------+---+------------+---------+
|User ID|Gender|Age|AnnualSalary|Purchased|
+-------+------+---+------------+---------+
|    385|  Male| 35|       20000|        0|
|    681|  Male| 40|       43500|        0|
|    353|  Male| 49|       74000|        0|
+-------+------+---+------------+---------+
only showing top 3 rows

+--------------------+---------+
|     features_scaled|Purchased|
+--------------------+---------+
|[0.38438438438438...|        0|
|[0.68068068068068...|        0|
|[0.35235235235235...|        0|
|[0.89489489489489...|        1|
|[0.66066066066066...|        0|
|[0.84584584584584...|        1|
|[0.21821821821821...|        1|
|[0.58758758758758...|        0|
|[0.08408408408408...|        0|
|[0.46446446446446...|        0|
|[0.68568568568568...|        0|
|[0.40740740740740...|        1|
|[0.78978978978978...|        0|
|[0.11511511511511...|        0|
|[0.11711711711711...|        1|
|[0.05305305305305...|        1|
|[0.08908908908908...|        0|
|[0.371

In [None]:
#Predict with model --> RAWPREDICTION IS VALUE BEFORE SIGMOID FUNCTION/SOFTMAX
pred = model.transform(test)
pred.show()

+-------+--------------+---+------------+---------+--------------------+--------------------+--------------------+--------------------+----------+
|User ID|Gender_encoded|Age|AnnualSalary|Purchased|            features|     features_scaled|       rawPrediction|         probability|prediction|
+-------+--------------+---+------------+---------+--------------------+--------------------+--------------------+--------------------+----------+
|     21|           1.0| 36|       62500|        0|[21.0,1.0,36.0,62...|[0.02002002002002...|[1.66424712854177...|[0.84080730600946...|       0.0|
|     22|           0.0| 56|      131500|        1|[22.0,0.0,56.0,13...|[0.02102102102102...|[-3.4950924495082...|[0.02945218841335...|       1.0|
|     29|           1.0| 36|       40500|        0|[29.0,1.0,36.0,40...|[0.02802802802802...|[2.23988930854211...|[0.90377483233897...|       0.0|
|     30|           1.0| 27|       58000|        0|[30.0,1.0,27.0,58...|[0.02902902902902...|[3.37918598563868...|[0.9

In [None]:
#ADD MORE EVALUATORS
evaluator = BinaryClassificationEvaluator(labelCol='Purchased')
areaUnderROC = evaluator.evaluate(pred, {evaluator.metricName: 'areaUnderROC'})
#areaUnderPR = 'areaUnderPR'
print(areaUnderROC)

0.9261607142857144


In [None]:
#Gridsearch to optimize Logistic Regression HyperParameters

In [None]:
#Evaluate Model Performance