In [5]:
# Exercise 47: Reading Data from a CSV File Using the PySpark Object

# 1. import all the required packages:


import os
import pandas as pd
import numpy as np
import collections
from sklearn.base import TransformerMixin
import random
# import pandas_profiling

In [6]:
import seaborn as sns
import time
import re
import matplotlib.pyplot as plt

In [8]:
# 2. Import all the libraries required for Spark to build the Spark session:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-bank').getOrCreate()

In [9]:
# 3. Create and read the data from the CSV file using the Spark object, as illustrated:


df_csv = spark.read.csv('bank.csv', sep=';', header = True, inferSchema = True)

In [20]:
# 4. Now let's print the first five rows from the Spark object using the following command:

df_csv.head(5)

[Row(age=30, job='unemployed', marital='married', education='primary', default='no', balance=1787, housing='no', loan='no', contact='cellular', day=19, month='oct', duration=79, campaign=1, pdays=-1, previous=0, poutcome='unknown', y='no'),
 Row(age=33, job='services', marital='married', education='secondary', default='no', balance=4789, housing='yes', loan='yes', contact='cellular', day=11, month='may', duration=220, campaign=1, pdays=339, previous=4, poutcome='failure', y='no'),
 Row(age=35, job='management', marital='single', education='tertiary', default='no', balance=1350, housing='yes', loan='no', contact='cellular', day=16, month='apr', duration=185, campaign=1, pdays=330, previous=1, poutcome='failure', y='no'),
 Row(age=30, job='management', marital='married', education='tertiary', default='no', balance=1476, housing='yes', loan='yes', contact='unknown', day=3, month='jun', duration=199, campaign=4, pdays=-1, previous=0, poutcome='unknown', y='no'),
 Row(age=59, job='blue-coll

In [12]:
df_csv1 = df_csv

In [19]:
# 5. The previous output is unstructured.
# Let's first identify the data types to proceed to get the structured data.
# Use the following command to print the datatype of each column:

df_csv1.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)



In [21]:
# 6. Now let's calculate the total number of rows and columns with names
# to get a clear idea of the data we have:

df_csv1.count()

4521

In [23]:
len(df_csv1.columns), df_csv1.columns

(17,
 ['age',
  'job',
  'marital',
  'education',
  'default',
  'balance',
  'housing',
  'loan',
  'contact',
  'day',
  'month',
  'duration',
  'campaign',
  'pdays',
  'previous',
  'poutcome',
  'y'])

In [24]:
# 7. Print the summary statistics for the DataFrame using the following command:

df_csv1.describe()

DataFrame[summary: string, age: string, job: string, marital: string, education: string, default: string, balance: string, housing: string, loan: string, contact: string, day: string, month: string, duration: string, campaign: string, pdays: string, previous: string, poutcome: string, y: string]

In [25]:
df_csv1.describe().show()

+-------+------------------+-------+--------+---------+-------+------------------+-------+----+--------+------------------+-----+------------------+------------------+------------------+------------------+--------+----+
|summary|               age|    job| marital|education|default|           balance|housing|loan| contact|               day|month|          duration|          campaign|             pdays|          previous|poutcome|   y|
+-------+------------------+-------+--------+---------+-------+------------------+-------+----+--------+------------------+-----+------------------+------------------+------------------+------------------+--------+----+
|  count|              4521|   4521|    4521|     4521|   4521|              4521|   4521|4521|    4521|              4521| 4521|              4521|              4521|              4521|              4521|    4521|4521|
|   mean| 41.17009511170095|   null|    null|     null|   null|1422.6578190665782|   null|null|    null|15.9152842291528

In [26]:
# 8. To select multiple columns from a DataFrame, we can use the 
# df_csv1.select('col1', 'col2', 'col3') function.
# For example, let's select the first five rows from the balance 
# and y columns using the following command:

df_csv1.select('balance','y').show(5)

+-------+---+
|balance|  y|
+-------+---+
|   1787| no|
|   4789| no|
|   1350| no|
|   1476| no|
|      0| no|
+-------+---+
only showing top 5 rows



In [27]:
# 9. To identify the relation between two variables in terms of their frequency of levels,
# crosstab can be used.
# To derive crosstab between two columns, we can use the df_csv1.crosstab('col1', col2)
# function. 
# Crosstab iscarried out between two categorical variables and not between
# numerical variables:

df_csv1.crosstab('y', 'marital').show()

+---------+--------+-------+------+
|y_marital|divorced|married|single|
+---------+--------+-------+------+
|      yes|      77|    277|   167|
|       no|     451|   2520|  1029|
+---------+--------+-------+------+



In [28]:
# 10. Now, let's add a new column to the dataset.

# We are using here sample() method. It is used for taking samples from the large 
# dataset.
# Sampling is the process of determining a representative subgroup from the dataset 
# for 
# a specified casestudy. Sampling stands for crucial research and business decision 
# results.
# For this reason, it is essential touse the most appropriate and useful sampling 
# methods
# with the provided technology. This article is mainly for data scientists and data 
# engineers
# looking to use the newest enhancements of Apache Spark in the sub-area of sampling.

# If the sample() is used, simple random sampling is applied, and each element in the 
# dataset has an equal chance of being selected. Variable selection is made from the
# dataset at the fraction rate specified randomly without grouping or clustering on
# the basis of any variable. This method works with 3 parameters.

# The withReplacement parameter is set to False by default, so the element can only be 
# selected as a sample once. If this value is changed to True, it is possible to 
# select
# a sample value in the same sampling again.

# There may be a slight difference between the number of withReplacement = True 
# and 
# withReplacement = False since the elements can be selected more than once.
# Another parameter, the fraction field that is required to be filled, and as 
# stated in 
# Spark’s official documentation, it may not be divided by the specified 
# percentage value.
# If any number is assigned to the seed field, it can be thought of as 
# assigning a special 
# id to that sampling. In this way, the same sample is selected every 
# time the script is run.
# If this value is left as None, a different sampling group is created 
# each time.


sample1 = df_csv1.sample(False, 0.2, 42)

In [29]:
sample1.count()

935

In [30]:
sample2 = sample1.withColumn('balance_new',sample1.balance/2.0)

In [31]:
sample2.select('balance','balance_new').show(10)

+-------+-----------+
|balance|balance_new|
+-------+-----------+
|    147|       73.5|
|   4073|     2036.5|
|   -221|     -110.5|
|    627|      313.5|
|    229|      114.5|
|   3935|     1967.5|
|   -849|     -424.5|
|    844|      422.0|
|    101|       50.5|
|   1355|      677.5|
+-------+-----------+
only showing top 10 rows



In [32]:
# 11. Drop the newly created column using the following command:

sample2 = sample2.drop('balance_new')

In [33]:
# Exercise 49: Creating and Merging Two DataFrames

# 5. Add new, ID column. Generate it's values using method:
from pyspark.sql.functions import monotonically_increasing_id

In [34]:
train_df_id = df_csv1.withColumn('ID',monotonically_increasing_id())

In [36]:
# 6. Now, to merge the two DataFrames using the primary key (ID), first, we will have to
# split it into two DataFrames. Schemas of the dataframes should be as follows:
# train_with_id1('ID' and all columns except 'balance')
# train_with_id1('ID','balance')

train_df_id2 = train_df_id.select('ID','balance')
train_df_id1 = train_df_id.drop('balance')

In [37]:
# 7. Merge train_with_id1 and train_with_id2 using the following command:

train_merged = train_df_id1.join(train_df_id2, on=['ID'], how='left_outer')

In [38]:
# Exercise 50: Subsetting the DataFrame

# 1. Make a subset with records, where balance value is in range between 0 and 2000.

train_subset = df_csv1.filter((df_csv1.balance > 0.0) & (df_csv1.balance < 2000.0))

In [39]:
pd.DataFrame(train_subset.head(5))

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16
0,30,unemployed,married,primary,no,1787,no,no,cellular,19,oct,79,1,-1,0,unknown,no
1,35,management,single,tertiary,no,1350,yes,no,cellular,16,apr,185,1,330,1,failure,no
2,30,management,married,tertiary,no,1476,yes,yes,unknown,3,jun,199,4,-1,0,unknown,no
3,35,management,single,tertiary,no,747,no,no,cellular,23,feb,141,2,176,3,failure,no
4,36,self-employed,married,tertiary,no,307,yes,no,cellular,14,may,341,1,330,2,other,no
