# Testing running databricks locally

In [2]:
# Starting spark session
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
spark = SparkSession.builder \
    .master("local[1]") \
    .getOrCreate()


In [3]:
# Imports
import pandas as pd
import re

import pyspark.sql.functions as F
from pyspark.sql.functions import row_number, udf, col, sys, datediff, to_date, mean, stddev
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

In [14]:

import pandas as pd   

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql import SparkSession


 
data = [['Scott', 50], ['Jeff', 45], ['Thomas', 54],['Ann',34]] 
  
# Create the pandas DataFrame 
pandasDF = pd.DataFrame(data, columns = ['Name', 'Age']) 
  
# print dataframe. 
print(pandasDF)


spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

sparkDF=spark.createDataFrame(pandasDF) 
sparkDF.printSchema()
sparkDF.show()

#sparkDF=spark.createDataFrame(pandasDF.astype(str)) 
mySchema = StructType([ StructField("First Name", StringType(), True)\
                       ,StructField("Age", IntegerType(), True)])

sparkDF2 = spark.createDataFrame(pandasDF,schema=mySchema)
sparkDF2.printSchema()
sparkDF2.show()

# Enable Apache Arrow to convert Pandas to PySpark DataFrame
spark.conf.set("spark.sql.execution.arrow.enabled","true")
sparkDF2=spark.createDataFrame(pandasDF) 
sparkDF2.printSchema()
sparkDF2.show()

#Convert PySpark DataFrame to Pandas
pandasDF2=sparkDF2.select("*").toPandas
print(pandasDF2)

     Name  Age
0   Scott   50
1    Jeff   45
2  Thomas   54
3     Ann   34
root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+------+---+
|  Name|Age|
+------+---+
| Scott| 50|
|  Jeff| 45|
|Thomas| 54|
|   Ann| 34|
+------+---+

root
 |-- First Name: string (nullable = true)
 |-- Age: integer (nullable = true)

+----------+---+
|First Name|Age|
+----------+---+
|     Scott| 50|
|      Jeff| 45|
|    Thomas| 54|
|       Ann| 34|
+----------+---+

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


+------+---+
|  Name|Age|
+------+---+
| Scott| 50|
|  Jeff| 45|
|Thomas| 54|
|   Ann| 34|
+------+---+

<bound method PandasConversionMixin.toPandas of DataFrame[Name: string, Age: bigint]>


In [13]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



In [4]:
def state_to_code( x):
    """
    Turns states to state code
    
    :param x : The string in question
    :returns: prediction of the intended state code
    """
    states = {'AK': 'Alaska','AL': 'Alabama','AR': 'Arkansas','AS': 'American Samoa','AZ': 'Arizona',
                'CA': 'California', 'CO': 'Colorado','CT': 'Connecticut','DC': 'District of Columbia',
                'DE': 'Delaware','FL': 'Florida','GA': 'Georgia','GU': 'Guam','HI': 'Hawaii','IA': 'Iowa',
                'ID': 'Idaho','IL': 'Illinois','IN': 'Indiana','KS': 'Kansas','KY': 'Kentucky','LA': 'Louisiana',
                'MA': 'Massachusetts','MD': 'Maryland','ME': 'Maine','MI': 'Michigan','MN': 'Minnesota',
                'MO': 'Missouri','MP': 'Northern Mariana Islands','MS': 'Mississippi','MT': 'Montana',
                'NC': 'North Carolina','ND': 'North Dakota','NE': 'Nebraska','NH': 'New ','NJ': 'New Jersey',
                'NM': 'New Mexico','NV': 'Nevada','NY': 'New York','OH': 'Ohio','OK': 'Oklahoma','OR': 'Oregon',
                'PA': 'Pennsylvania','PR': 'Puerto Rico','RI': 'Rhode Island','SC': 'South Carolina',
                'SD': 'South Dakota','TN': 'Tennessee','TX': 'Texas','UT': 'Utah','VA': 'Virginia',
                'VI': 'Virgin Islands','VT': 'Vermont','WA': 'Washington','WI': 'Wisconsin','WV': 'West Virginia',
                'WY': 'Wyoming'}
    
    if len(x) == 2: # Try another way for 2-letter codes
        for a,n in states.items():
            if len(n.split()) == 2:
                if "".join([c[0] for c in n.split()]).lower() == x.lower():
                    return a.upper()
    new_rx = re.compile(r"\w*".join([ch for ch in x]), re.I)
    for a,n in states.items():
        if new_rx.match(n):
            return a.upper()

state_udf = udf(state_to_code, StringType()) #

def remove_outliers(df,columns,n_std):
    """
    Function to remove any value past a number of standard deviations from the mean of values in a dateframe
    
    :param df: The dateframe you want to remove the outliers from
    :param columns: The name of the columns
    :returns: Returns dataframe with outliers removed
    """
    
    for column in columns:
        avg = df.agg({column: 'mean'})
        av = avg.first()[f'avg({column})']
        std = df.agg({column: 'stddev'})
        sd = std.first()[f'stddev({column})']

        df = df[(df[column] <= av + (n_std * sd))]
        df = df[(df[column] >= av - (n_std * sd))]
    return df




In [27]:
# Load data
cust_pd = pd.read_csv("data/customers_data.csv")
tran_pd = pd.read_csv("data/transactions_data.csv")

spark = SparkSession.builder \
    .master("local[1]") \
    .getOrCreate()

cust_df = spark.createDataFrame(cust_pd) 
tran_df = spark.createDataFrame(tran_pd[:100000]) 
cust_df.printSchema()
tran_df.show()


# # remove outliers and null
# cust_df = cust_df.dropna()
# tran_df =tran_df.dropna()
# cust_df = remove_outliers(cust_df, ['start_balance'], 4)         
# tran_df = remove_outliers(tran_df, ['deposit', 'amount', 'withdrawal'], 4)  

  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


root
 |-- customer_id: long (nullable = true)
 |-- dob: string (nullable = true)
 |-- state: string (nullable = true)
 |-- start_balance: double (nullable = true)
 |-- creation_date: string (nullable = true)

+----------+----------+-----------+--------+----------------+-------+----------+
|      date|account_id|customer_id|  amount|transaction_date|deposit|withdrawal|
+----------+----------+-----------+--------+----------------+-------+----------+
|2007-01-31|  24137947|         91| 3034.26|      2007-01-31|3034.26|       0.0|
|2007-01-31|  24137947|         91|-5295.18|      2007-01-16|    0.0|  -5295.18|
|2007-02-28|  24137947|         91|     0.0|      2007-02-28|    0.0|       0.0|
|2007-03-31|  24137947|         91|    -0.0|      2007-03-30|    0.0|      -0.0|
|2007-03-31|  24137947|         91|    -0.0|      2007-03-11|    0.0|      -0.0|
|2007-03-31|  24137947|         91|    -0.0|      2007-03-20|    0.0|      -0.0|
|2007-01-31|  24137948|         92|     0.0|      2007-01-31| 

DataFrame[date: string, account_id: bigint, customer_id: bigint, amount: double, transaction_date: string, deposit: double, withdrawal: double]

In [26]:
df = tran_df

# Add a column for volume
w = Window().orderBy(['account_id', "date"])
df = df.withColumn("volume", row_number().over(w))
# Aggragate to monthly data
df = df.groupby(["account_id", "date"]).agg({"account_id": "mean",
                                            "customer_id": "mean",
                                            "amount": "sum",
                                            "deposit":"sum",
                                            "withdrawal":"sum",
                                            "volume": "count"
                                            })

df = df.withColumnRenamed("avg(customer_id)", "customer_id") \
       .withColumnRenamed("sum(amount)","amount")\
       .withColumnRenamed("sum(withdrawal)","withdrawal")\
       .withColumnRenamed("sum(deposit)","deposit")\
       .withColumnRenamed("count(volume)","volume") \
       .drop("avg(account_id)") \
       .withColumn("customer_id", col("customer_id").cast("double"))

# Join customer infromation to transactions
df = df.join(cust_df,"customer_id")  

df= df.withColumn("state", state_udf("state")) #
df = df.withColumn('balance', F.sum(df["amount"]).over(Window.partitionBy('account_id').orderBy("date").rowsBetween(-sys.maxsize, 0)))
df = df.withColumn("balance", col("balance")+col("start_balance"))
df= df.withColumn('account_length',datediff(col("date"),col("creation_date")))
df= df.withColumn('age',datediff(col("date"),col("dob")))
# Calculate churn
w2 = Window.partitionBy("account_id")
df = df.withColumn("last_date", F.max("date").over(w2))
df = df.withColumn('isChurn', F.when((F.col("last_date") == col('date')),1).otherwise(0))

df = df.drop("customer_id","dob", "creation_date","last_date")

df.show()

+----------+----------+-------------------+----------+-------+------+-----+-------------+------------------+--------------+----+-------+
|account_id|      date|             amount|withdrawal|deposit|volume|state|start_balance|           balance|account_length| age|isChurn|
+----------+----------+-------------------+----------+-------+------+-----+-------------+------------------+--------------+----+-------+
|  24137947|2007-01-31|           -2260.92|  -5295.18|3034.26|     2|   CA|     10180.56| 7919.639999999999|             0|4962|      0|
|  24137947|2007-02-28|                0.0|       0.0|    0.0|     1|   CA|     10180.56| 7919.639999999999|            28|4990|      0|
|  24137947|2007-03-31|                0.0|       0.0|    0.0|     3|   CA|     10180.56| 7919.639999999999|            59|5021|      1|
|  24137948|2007-01-31|                0.0|       0.0|    0.0|     1|   NY|      4757.68|           4757.68|             0|7727|      0|
|  24137948|2007-02-28|             1164.

In [35]:
# Macro Economics


gdp = spark.read.option("header",True).csv("data/GDP_data_quarterly.csv")

# resample the GDP in pandas
gdp = gdp.toPandas()
gdp["date"] = pd.to_datetime(gdp["date"])
gdp = gdp.set_index('date').resample('M').ffill()
gdp = gdp.loc['2007-01-31':'2020-05-31']
gdp = gdp.reset_index()
gdp_rs = spark.createDataFrame(gdp) 


income = spark.read.option("header",True).csv("data/income_data.csv")
interest = spark.read.option("header",True).csv("data/interest_rate_data.csv")
umcs = spark.read.option("header",True).csv("data/cust_senti_data.csv")
unemployment = spark.read.option("header",True).csv("data/unemployment_data.csv")

macro_df = interest.join(income, "date","inner")
macro_df = macro_df.join(umcs,"date","inner")
macro_df = macro_df.join(unemployment,"date","inner")
macro_df = macro_df.withColumn("date",to_date(col("date"),"dd/MM/yyyy")) 

macro_df = macro_df.join(gdp_rs,"date","inner")

macro_df.show()



  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+----------+-------------+-----------+---------------+-------+----------+---------+------------+
|      date|interest_rate|real_income|p_change_income|UMCSENT|unemp_rate|    GDPC1|p_change_gdp|
+----------+-------------+-----------+---------------+-------+----------+---------+------------+
|2007-01-31|         6.25|    11439.6|         0.3148|   91.3|       4.5|15767.146| 0.609644003|
|2007-02-28|         6.25|      11475|         0.3095|   88.4|       4.4|15767.146| 0.609644003|
|2007-03-31|         6.25|    11490.1|         0.1316|   87.1|       4.5|15767.146| 0.609644003|
|2007-04-30|         6.25|    11495.5|          0.047|   88.3|       4.4|15767.146| 0.609644003|
|2007-05-31|         6.25|    11488.4|        -0.0618|   85.3|       4.6|15767.146| 0.609644003|
|2007-06-30|         6.25|    11500.4|         0.1045|   90.4|       4.7|15767.146| 0.609644003|
|2007-07-31|         5.75|    11506.2|         0.0504|   83.4|       4.6|15767.146| 0.609644003|
|2007-08-31|         5.25|    

In [None]:
# Combine macro with transction data
df = df.join(macro_df,"date", "inner")
display(df)


In [43]:

# save data
pd_df = df.toPandas()
pd_df.to_csv("data/churn.csv")


  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
