# **Складишта на податоци и аналитичка обработка**

# Setting up PySpark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"
os.environ["PATH_TO_DRIVER_JAR"] = "/content/sqljdbc_8.2.2.0_enu/sqljdbc_8.2/enu/mssql-jdbc-8.2.2.jre8.jar"
os.environ["PYSPARK_PYTHON"] = 'python3'

## Initiate SparkContext

### Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

In [None]:
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()

## Initiate SQLContext

### SQLContext is a class that contains several useful functions to work with Spark SQL and it is an entry point to Spark SQL. Here we also add a file to be downloaded with this Spark job on every node (the file from the URL).

In [None]:
from pyspark.sql import SQLContext
from pyspark import SparkFiles

url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
sc.addFile(url)
sqlContext = SQLContext(sc)

### Read csv file. When header is set to true the first line of files will be used to name columns and will not be included in data. All types will be assumed automatically because of inferSchema=True.

In [None]:
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema=True)

In [None]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



See several rows of the dataset

In [None]:
df.show(5, truncate=False)

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education   |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|3  |28 |Local-gov|336951|Assoc-acdm  |12             |Married-civ-spouse|Protective-serv 

## Custom function to convert the data type of DataFrame columns
### Parameters are the dataframe, the columns to be converted, and the data type to which they will be converted. For every column, we cast it to the new data type and set the same column to be with the casted values

In [None]:
from pyspark.sql.types import *

def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

## Convert continuous features to Float

In [None]:
CONTINUOUS_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']
df_string = convertColumn(df, CONTINUOUS_FEATURES, FloatType())
df_string.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



## Different operations we can do with the columns

### Select some columns (as with SQL)

In [None]:
df.select('age','fnlwgt').show(5)

+---+------+
|age|fnlwgt|
+---+------+
| 25|226802|
| 38| 89814|
| 28|336951|
| 44|160323|
| 18|103497|
+---+------+
only showing top 5 rows



### Count number of rows by education level and then sort by the count

In [None]:
df.groupBy("education").count().sort("count", ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



### Show some descriptive statistics

In [None]:
df.describe().show()

+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|summary|                 x|               age|  workclass|            fnlwgt|   education|   educational-num|marital-status|      occupation|relationship|              race|gender|      capital-gain|     capital-loss|    hours-per-week|native-country|income|
+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|  count|             48842|             48842|      48842|             48842|       48842|             48842|         48842|           48842|       48842|             48842| 48842|             48842|            48842|  

### Show descriptive statistics for specific column



In [None]:
df.describe('capital-gain').show()

+-------+------------------+
|summary|      capital-gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655413|
|    min|                 0|
|    max|             99999|
+-------+------------------+



### Cross-tabular reports are matrix-like or spreadsheet-like reports. These reports are useful for presenting summary numeric data. Here we do a Cross-tabular report between 2 pairwise columns (number of people grouped by age, with income below or above 50k) and sort the results by age. We can see that as the age increases, there are more and more people with income above 50K. 

In [None]:
df.crosstab('age', 'income').sort("age_income").show()

+----------+-----+----+
|age_income|<=50K|>50K|
+----------+-----+----+
|        17|  595|   0|
|        18|  862|   0|
|        19| 1050|   3|
|        20| 1112|   1|
|        21| 1090|   6|
|        22| 1161|  17|
|        23| 1307|  22|
|        24| 1162|  44|
|        25| 1119|  76|
|        26| 1068|  85|
|        27| 1117| 115|
|        28| 1101| 179|
|        29| 1025| 198|
|        30| 1031| 247|
|        31| 1050| 275|
|        32|  957| 296|
|        33| 1045| 290|
|        34|  949| 354|
|        35|  997| 340|
|        36|  948| 400|
+----------+-----+----+
only showing top 20 rows



### This is how we can drop columns if they are unnecessary (here we actually drop a column on a copy of the dataframe because it is only done as an example and we need this column)

In [None]:
df.drop('educational-num').columns

['x',
 'age',
 'workclass',
 'fnlwgt',
 'education',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income']

# Handling missing data

### The missing values are populated with a question mark (the string '?'). We need those values to be null so that we can use built in functions from the PySpark DataFrame API to make the handling of missing data more convenient. So we replace '?' with None.

In [None]:
df = df.replace('?', None)

df.show()

+---+---+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|  x|age|       workclass|fnlwgt|   education|educational-num|    marital-status|       occupation| relationship|              race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|  1| 25|         Private|226802|        11th|              7|     Never-married|Machine-op-inspct|    Own-child|             Black|  Male|           0|           0|            40| United-States| <=50K|
|  2| 38|         Private| 89814|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|      Husband|             White|  Male|           0|           0|            50| United-

### Dropping all null values. We can do this but in the process we also lose a lot of information (because the rows we drop have values in the columns that are not null).

In [None]:
df.na.drop().show()

+---+---+----------------+------+------------+---------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|  x|age|       workclass|fnlwgt|   education|educational-num|    marital-status|       occupation| relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+----------------+------+------------+---------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|  1| 25|         Private|226802|        11th|              7|     Never-married|Machine-op-inspct|    Own-child|Black|  Male|           0|           0|            40| United-States| <=50K|
|  2| 38|         Private| 89814|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|      Husband|White|  Male|           0|           0|            50| United-States| <=50K|
|  3| 28|       Local-gov|336951|  Assoc-acdm|    

### Rows having more than 2 nulls are dropped when we set threshold to 2. This reduces the number of rows we drop and consequently the information we lose.

In [None]:
df.na.drop(thresh=2).show()

+---+---+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|  x|age|       workclass|fnlwgt|   education|educational-num|    marital-status|       occupation| relationship|              race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|  1| 25|         Private|226802|        11th|              7|     Never-married|Machine-op-inspct|    Own-child|             Black|  Male|           0|           0|            40| United-States| <=50K|
|  2| 38|         Private| 89814|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|      Husband|             White|  Male|           0|           0|            50| United-

### Filling nulls with a custom value

In [None]:
df.na.fill('NA').show()

+---+---+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|  x|age|       workclass|fnlwgt|   education|educational-num|    marital-status|       occupation| relationship|              race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|  1| 25|         Private|226802|        11th|              7|     Never-married|Machine-op-inspct|    Own-child|             Black|  Male|           0|           0|            40| United-States| <=50K|
|  2| 38|         Private| 89814|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|      Husband|             White|  Male|           0|           0|            50| United-

## A common transformation in statistical analysis with grouped data is to replace missing data within each group with the mean of the non-NaN values in the group.

### Custom function for finding the mean of all columns we pass in as an argument. Returns a list of lists with 2 elements, the column name and the mean value.

In [None]:
from pyspark.sql.functions import avg

def mean_of_pyspark_columns(df, numeric_cols, verbose=False):
    col_with_mean=[]
    for col in numeric_cols:
        mean_value = df.select(avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row : row[avg_col]).collect()
        
        if (verbose==True): print(mean_value.columns[0], "\t", res[0])
        col_with_mean.append([col, res[0]])    
    return col_with_mean

### We iterate column by column and replace the missing values with the column mean, while we leave the other values as they are

In [None]:
from pyspark.sql.functions import when, lit

def fill_missing_with_mean(df, numeric_cols):
    col_with_mean = mean_of_pyspark_columns(df, numeric_cols) 
    
    for col, mean in col_with_mean:
        df = df.withColumn(col, when(df[col].isNull()==True, 
        lit(mean)).otherwise(df[col]))
        
    return df

In [None]:
NUMERIC_COLS = ['age', 'fnlwgt', 'capital-gain', 'capital-loss', 'hours-per-week']

df = fill_missing_with_mean(df, NUMERIC_COLS)
df.show()

+---+----+----------------+--------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|  x| age|       workclass|  fnlwgt|   education|educational-num|    marital-status|       occupation| relationship|              race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----+----------------+--------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|  1|25.0|         Private|226802.0|        11th|              7|     Never-married|Machine-op-inspct|    Own-child|             Black|  Male|         0.0|         0.0|          40.0| United-States| <=50K|
|  2|38.0|         Private| 89814.0|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|      Husband|             White|  Male|         0.0|         0.0|        

## Fill missing values from categorical column with mode of the column

### Custom function for finding the mode of all columns we pass in as an argument. Returns a list of lists with 2 elements, the column name and the mode value.

In [None]:
def mode_of_pyspark_columns(df, cat_col_list, verbose=False):
    col_with_mode=[]
    for col in cat_col_list:
        #Filter null
        df = df.filter(df[col].isNull()==False)
        #Find unique_values_with_count
        unique_classes = df.select(col).distinct().rdd.map(lambda x: x[0]).collect()
        unique_values_with_count=[]
        for uc in unique_classes:
             unique_values_with_count.append([uc, df.filter(df[col]==uc).count()])
        #sort unique values w.r.t their count values
        sorted_unique_values_with_count= sorted(unique_values_with_count, key = lambda x: x[1], reverse =True)
        
        if (verbose==True): print(col, sorted_unique_values_with_count, " and mode is ", sorted_unique_values_with_count[0][0])
        col_with_mode.append([col, sorted_unique_values_with_count[0][0]])
    return col_with_mode

### We iterate column by column and replace the missing values with the column mode, while we leave the other values as they are. Just like we did with the mean

In [None]:
from pyspark.sql.functions import when, lit

def fill_missing_with_mode(df, cat_col_list):
    col_with_mode = mode_of_pyspark_columns(df, cat_col_list)
    
    for col, mode in col_with_mode:
        df = df.withColumn(col, when(df[col].isNull()==True, 
        lit(mode)).otherwise(df[col]))
        
    return df

In [None]:
CATE_COLS = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']

df = fill_missing_with_mode(df, CATE_COLS)
df.show()

+---+----+----------------+--------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|  x| age|       workclass|  fnlwgt|   education|educational-num|    marital-status|       occupation| relationship|              race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----+----------------+--------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|  1|25.0|         Private|226802.0|        11th|              7|     Never-married|Machine-op-inspct|    Own-child|             Black|  Male|         0.0|         0.0|          40.0| United-States| <=50K|
|  2|38.0|         Private| 89814.0|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|      Husband|             White|  Male|         0.0|         0.0|        

### Counting number of rows filtered by some condition

In [None]:
df.filter(df.age > 40).count()

20211

### Descriptive statistics by group (calculate mean of data grouped by marital status) which reveals some information. We can see that people married with a person in the Armed Forced (Married-AF-spouse) have the highest capital gain. People that never married have the lowest.

In [None]:
df.groupby('marital-status').agg({'capital-gain': 'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



## Create new features from existing ones

### Age is not a linear function with the income. When people are young, their income is usually lower than mid-age. After retirement, a household uses their saving, meaning a decrease in income. To capture this pattern, we add a square to the age feature.

In [None]:
from pyspark.sql.functions import *

df = df.withColumn("age_square", col("age") ** 2)
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age_square: double (nullable = true)



### Check row count for every country and sort them

In [None]:
df.groupby('native-country').agg({'native-country': 'count'}).sort(asc("count(native-country)")).show()

+--------------------+---------------------+
|      native-country|count(native-country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|Outlying-US(Guam-...|                   23|
|                Laos|                   23|
|          Yugoslavia|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|                Hong|                   30|
|            Thailand|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|              Greece|                   49|
|           Nicaragua|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|         

### Holand-Netherlands has only 1 observation

In [None]:
df.filter(df['native-country'] == 'Holand-Netherlands').count()

1

### When a group within a feature has only one observation, it brings no information to the model. On the contrary, it can lead to an errors, so we remove that row (and group).

In [None]:
df = df.filter(df['native-country'] != 'Holand-Netherlands')	

# Infering new Fields (rank, lag, moving avg) for the Covid-19 cases in Germany!




In [None]:
"""
===== For the following examples, the covid_de.csv file is used =====
#contains more numerical data
#has a "data" column
"""
df_covid = sqlContext.read.csv(SparkFiles.get("/content/covid_de.csv"), header=True, inferSchema=True)

In [None]:
#columns
df_covid.printSchema()

root
 |-- state: string (nullable = true)
 |-- county: string (nullable = true)
 |-- age_group: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- date: string (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- recovered: integer (nullable = true)



In [None]:
#first 20 rows
df_covid.show()

+------------------+------------------+---------+------+----------+-----+------+---------+
|             state|            county|age_group|gender|      date|cases|deaths|recovered|
+------------------+------------------+---------+------+----------+-----+------+---------+
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-03-27|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-03-28|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     F|2020-04-03|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     M|2020-04-05|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     M|2020-05-18|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     M|2020-07-27|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     M|2020-08-12|    1|     0|        1|
|Baden-Wuerttemberg|LK Alb-Donau-Kreis|    00-04|     M|2020-08-23|    1|     0|        1|

In [None]:
from pyspark.sql.window import Window # for initialising the sliding window
from pyspark.sql import functions as F # module containing some basic sql functionalities

"""
== pyspark.sql.functions.rank() 
== This gives you the ranking within your ordered partition.
== Ties are assigned the same rank, with the next ranking/s skipped
"""
#The rank for the number of cases, partitioned by the GENDER
windowSpec = Window().partitionBy(['gender']).orderBy(F.desc('cases'))
df_covid.withColumn("rank",F.rank().over(windowSpec)).show()

+-------------------+--------------------+---------+------+----------+-----+------+---------+----+
|              state|              county|age_group|gender|      date|cases|deaths|recovered|rank|
+-------------------+--------------------+---------+------+----------+-----+------+---------+----+
|             Bayern|         SK Muenchen|    35-59|     F|2020-03-26|   64|     0|       64|   1|
|             Bayern|         SK Muenchen|    35-59|     F|2020-03-31|   60|     1|       59|   2|
|Nordrhein-Westfalen|       LK Guetersloh|    35-59|     F|2020-06-17|   56|     0|       56|   3|
|             Bayern|         SK Muenchen|    15-34|     F|2020-03-25|   54|     0|       54|   4|
|             Bayern|         SK Muenchen|    35-59|     F|2020-03-25|   54|     0|       54|   4|
|             Bayern|         SK Muenchen|    15-34|     F|2020-03-31|   52|     0|       52|   6|
|             Bayern|         SK Muenchen|    35-59|     F|2020-03-27|   52|     0|       52|   6|
| Baden-Wu

In [None]:
"""
== pyspark.sql.functions.lag()
== fetches data from previous rows
== finding trends
== infering new dimensions for classification/regression
"""
#Computing the lag with 7 days difference
windowSpec = Window().partitionBy(['state']).orderBy('date')
dfWithLag = df_covid.withColumn("lag_week",F.lag("cases", 7).over(windowSpec))
dfWithLag.filter(dfWithLag.date>'2020-03-11').show()

+--------------+--------------------+---------+------+----------+-----+------+---------+--------+
|         state|              county|age_group|gender|      date|cases|deaths|recovered|lag_week|
+--------------+--------------------+---------+------+----------+-----+------+---------+--------+
|Sachsen-Anhalt|LK Altmarkkreis S...|    15-34|     M|2020-03-12|    1|     0|        1|       2|
|Sachsen-Anhalt|LK Anhalt-Bitterfeld|    15-34|     M|2020-03-12|    1|     0|        1|       2|
|Sachsen-Anhalt|           LK Boerde|    60-79|     M|2020-03-12|    1|     0|        1|       1|
|Sachsen-Anhalt|             LK Harz|    35-59|     F|2020-03-12|    1|     0|        1|       2|
|Sachsen-Anhalt|             LK Harz|    35-59|     M|2020-03-12|    1|     0|        1|       1|
|Sachsen-Anhalt|    LK Salzlandkreis|    35-59|     M|2020-03-12|    1|     0|        1|       1|
|Sachsen-Anhalt|       LK Wittenberg|    15-34|     F|2020-03-12|    1|     0|        1|       1|
|Sachsen-Anhalt|    

In [None]:
"""
== pyspark.sql.functions.lead()
== fetches data from subsequent rows
== finding trends
== infering new dimensions for classification/regression
"""
#computing lead with approximately 1 month period
windowSpec = Window().partitionBy(['state']).orderBy('date')
dfWithLag = df_covid.withColumn("lead_month",F.lead("cases", 31).over(windowSpec))
dfWithLag.filter(dfWithLag.date>'2020-03-11').show()

+--------------+--------------------+---------+------+----------+-----+------+---------+----------+
|         state|              county|age_group|gender|      date|cases|deaths|recovered|lead_month|
+--------------+--------------------+---------+------+----------+-----+------+---------+----------+
|Sachsen-Anhalt|LK Altmarkkreis S...|    15-34|     M|2020-03-12|    1|     0|        1|         2|
|Sachsen-Anhalt|LK Anhalt-Bitterfeld|    15-34|     M|2020-03-12|    1|     0|        1|         5|
|Sachsen-Anhalt|           LK Boerde|    60-79|     M|2020-03-12|    1|     0|        1|         1|
|Sachsen-Anhalt|             LK Harz|    35-59|     F|2020-03-12|    1|     0|        1|         1|
|Sachsen-Anhalt|             LK Harz|    35-59|     M|2020-03-12|    1|     0|        1|         1|
|Sachsen-Anhalt|    LK Salzlandkreis|    35-59|     M|2020-03-12|    1|     0|        1|         1|
|Sachsen-Anhalt|       LK Wittenberg|    15-34|     F|2020-03-12|    1|     0|        1|         1|


In [None]:
"""
== pyspark.sql.functions.mean()
== returns the avg value of a column
== if .rowsBetween() is not specified, it finds the whole mean
== if it is specified .rowsBetween(a,b) then,
a rolling window with a rows before the observed row and b rows
after the observed row are taken into consideration in
calculating the mean
"""
#moving avarage including 6 days around the observed day + itself
windowSpec = Window().partitionBy(['state']).orderBy('date').rowsBetween(-3,3)
dfWithRoll = df_covid.withColumn("roll_7_cases",F.mean("cases").over(windowSpec))
dfWithRoll.filter(dfWithLag.date>'2020-03-11').show()

+--------------+--------------------+---------+------+----------+-----+------+---------+------------------+
|         state|              county|age_group|gender|      date|cases|deaths|recovered|      roll_7_cases|
+--------------+--------------------+---------+------+----------+-----+------+---------+------------------+
|Sachsen-Anhalt|LK Altmarkkreis S...|    15-34|     M|2020-03-12|    1|     0|        1|               1.0|
|Sachsen-Anhalt|LK Anhalt-Bitterfeld|    15-34|     M|2020-03-12|    1|     0|        1|               1.0|
|Sachsen-Anhalt|           LK Boerde|    60-79|     M|2020-03-12|    1|     0|        1|               1.0|
|Sachsen-Anhalt|             LK Harz|    35-59|     F|2020-03-12|    1|     0|        1|               1.0|
|Sachsen-Anhalt|             LK Harz|    35-59|     M|2020-03-12|    1|     0|        1|               1.0|
|Sachsen-Anhalt|    LK Salzlandkreis|    35-59|     M|2020-03-12|    1|     0|        1|1.1428571428571428|
|Sachsen-Anhalt|       LK Wi

In [None]:
"""
== pyspark.sql.functions.sum()
== same functionalities with the mean function
== .rowsBetween(Window.unboundedPreceding,Window.currentRow)
is taken so it sums all the cases to the observed day
"""
#cumulative number of cases per state, ordered by the date
windowSpec = Window().partitionBy(['state']).orderBy('date').rowsBetween(Window.unboundedPreceding,Window.currentRow)
dfWithRoll = df_covid.withColumn("cumulative_cases",F.sum("cases").over(windowSpec))
dfWithRoll.filter(dfWithLag.date>'2020-03-01').show()

+--------------+--------------------+---------+------+----------+-----+------+---------+----------------+
|         state|              county|age_group|gender|      date|cases|deaths|recovered|cumulative_cases|
+--------------+--------------------+---------+------+----------+-----+------+---------+----------------+
|Sachsen-Anhalt|  LK Burgenlandkreis|    60-79|     F|2020-03-10|    1|     0|        1|               1|
|Sachsen-Anhalt|       LK Saalekreis|    60-79|     F|2020-03-10|    1|     0|        1|               2|
|Sachsen-Anhalt|    LK Salzlandkreis|    35-59|     M|2020-03-10|    1|     0|        1|               3|
|Sachsen-Anhalt|            SK Halle|    15-34|     M|2020-03-10|    1|     0|        1|               4|
|Sachsen-Anhalt|            SK Halle|    35-59|     M|2020-03-10|    1|     0|        1|               5|
|Sachsen-Anhalt|        SK Magdeburg|    60-79|     M|2020-03-10|    1|     0|        1|               6|
|Sachsen-Anhalt|LK Anhalt-Bitterfeld|    15-34

In [None]:
"""
pandas.DataFrame.rollup()
== get the distribution of cases for each state
and each county in that state separately 
"""
# rollup for the number of cases in one state and in the counties separately
df_covid.rollup(df_covid['state'],df_covid['county']).sum('cases').sort('state','county').show()

+------------------+--------------------+----------+
|             state|              county|sum(cases)|
+------------------+--------------------+----------+
|              null|                null|    278043|
|Baden-Wuerttemberg|                null|     47879|
|Baden-Wuerttemberg|  LK Alb-Donau-Kreis|       877|
|Baden-Wuerttemberg|         LK Biberach|       777|
|Baden-Wuerttemberg|    LK Bodenseekreis|       522|
|Baden-Wuerttemberg|       LK Boeblingen|      2047|
|Baden-Wuerttemberg|LK Breisgau-Hochs...|      1443|
|Baden-Wuerttemberg|             LK Calw|       890|
|Baden-Wuerttemberg|      LK Emmendingen|       671|
|Baden-Wuerttemberg|         LK Enzkreis|       834|
|Baden-Wuerttemberg|        LK Esslingen|      2585|
|Baden-Wuerttemberg|     LK Freudenstadt|       656|
|Baden-Wuerttemberg|       LK Goeppingen|      1138|
|Baden-Wuerttemberg|       LK Heidenheim|       614|
|Baden-Wuerttemberg|        LK Heilbronn|      1319|
|Baden-Wuerttemberg|   LK Hohenlohekreis|     

## Encoding categorical variables with OneHot Encoding method

### One-hot encoding maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features. For string type input data, it is common to encode categorical features using StringIndexer first.

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.fit(indexed).transform(indexed)
encoded.show(5)

+---+----+---------+--------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+----------+-----------------+-------------+
|  x| age|workclass|  fnlwgt|   education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|age_square|workclass_encoded|workclass_vec|
+---+----+---------+--------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+----------+-----------------+-------------+
|  1|25.0|  Private|226802.0|        11th|              7|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|         0.0|         0.0|          40.0| United-States| <=50K|     625.0|              0.0|(8,[0],[1.0])|
|  2|38.0|  Private| 89814.0|     HS-grad|              9|Married-civ-spouse|  Farming-f

## We will now build the pipeline which will consist of 5 steps:
### 1. encoding the categorical data
### 2. indexing the label feature
### 3. Adding continuous variable
### 4. Assemble the steps

### Next we encode all categorical features (previous cell was an example for one column only)

In [None]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age_square: double (nullable = true)



### We index all categorical columns with the StringIndexer. StringIndexer encodes a string column of labels to a column of label indices. StringIndexer can encode multiple columns. The indices are in [0, numLabels). Next we One-Hot encode the indexed columns. We add these 2 stages in the pipeline for all columns.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder

CATE_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']
stages = [] # stages in our Pipeline

for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                            outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

### Indexing the target variable(income)

In [None]:
label_stringIdx =  StringIndexer(inputCol="income", outputCol="newincome")
stages += [label_stringIdx]

### Adding the continuous variables to the input for the VectorAssembler. VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees.

In [None]:
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTINUOUS_FEATURES

assemblerInputs

['workclassclassVec',
 'educationclassVec',
 'marital-statusclassVec',
 'occupationclassVec',
 'relationshipclassVec',
 'raceclassVec',
 'genderclassVec',
 'native-countryclassVec',
 'age',
 'fnlwgt',
 'capital-gain',
 'educational-num',
 'capital-loss',
 'hours-per-week']

### We add the Assembler to the pipeline stages

In [None]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]	

### We create the pipeline. MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow.

In [None]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
model = pipelineModel.transform(df)

In [None]:
model.take(1)

[Row(x=1, age=25.0, workclass='Private', fnlwgt=226802.0, education='11th', educational-num=7, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0.0, capital-loss=0.0, hours-per-week=40.0, native-country='United-States', income='<=50K', age_square=625.0, workclassIndex=0.0, workclassclassVec=SparseVector(7, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(15, {5: 1.0}), marital-statusIndex=1.0, marital-statusclassVec=SparseVector(6, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(13, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), genderIndex=0.0, genderclassVec=SparseVector(1, {0: 1.0}), native-countryIndex=0.0, native-countryclassVec=SparseVector(39, {0: 1.0}), newincome=0.0, features=SparseVector(96, {0: 1.0, 12: 1.0, 23: 1.0, 34: 1.0, 43: 1.0, 47: 1.0, 50: 1.0, 51: 1.0, 90: 25.0, 91: 2

## Building the classifier

### For faster computation we convert the model to dataframe

In [None]:
from pyspark.ml.linalg import DenseVector

input_data = model.rdd.map(lambda x: (x["newincome"], DenseVector(x["features"])))
df_train = sqlContext.createDataFrame(input_data, ["income", "features"])

df_train.show()

+------+--------------------+
|income|            features|
+------+--------------------+
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   1.0|[0.0,0.0,1.0,0.0,...|
|   1.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   1.0|[0.0,1.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   1.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   1.0|[1.0,0.0,0.0,0.0,...|
|   1.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,1.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   1.0|[1.0,0.0,0.0,0.0,...|
+------+--------------------+
only showing top 20 rows



### Split the dataset to train and test

In [None]:
df_train = df_train.cache()
train_data, test_data = df_train.randomSplit([.8,.2], seed=43)

In [None]:
train_data.show()

+------+--------------------+
|income|            features|
+------+--------------------+
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
+------+--------------------+
only showing top 20 rows



### Count how many people there are with income below/above 50k in both training and test set. We inspect this to see if the sets are severely unbalanced which is not the case.

In [None]:
train_data.groupby('income').agg({'income': 'count'}).show()

+------+-------------+
|income|count(income)|
+------+-------------+
|   0.0|        29823|
|   1.0|         9360|
+------+-------------+



In [None]:
test_data.groupby('income').agg({'income': 'count'}).show()

+------+-------------+
|income|count(income)|
+------+-------------+
|   0.0|         7331|
|   1.0|         2327|
+------+-------------+



## Building the logistic regressor

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="income",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

linearModel = lr.fit(train_data)

In [None]:
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.12863364737077815,-0.18156806522743066,-0.04011268487727221,-0.11184537380290616,0.13213474349308152,0.1866422680189497,-0.27219432777404845,-0.19851434232830123,-0.07817198008047521,0.22194275665196056,0.4024938302207757,-0.01680811784852384,-0.3145732573558494,-0.00929292020184624,-0.33608778224610486,-0.43699620705411746,0.582673291996581,-0.3935466022217571,-0.25802623651736734,0.626310030431398,-0.3584820060218133,-0.37760875525077325,0.3261633636266663,-0.3567986728235619,-0.19882153370674402,-0.20364462403810552,-0.1743333724220145,-0.1263040224593002,0.05955406759367189,-0.06058544108354798,0.2911608786861292,-0.12514190916745033,0.03952099426009896,-0.28550335022504864,-0.20182952013358166,-0.10953164250024859,-0.2925047964648147,-0.2979915731048789,0.09660487829940864,0.09734375537174352,-0.2792302660252221,0.2731507597582963,-0.19648115257313148,-0.301806393536135,-0.24566737167384553,0.393287654596328,-0.07134665951519187,-0.17958908989355912,-0.0805842947

### We predict the values of the target variable in the test set. Next we evaluate the model by commonly used metrics such as accuracy and area under the ROC

In [None]:
predictions = linearModel.transform(test_data)
predictions.printSchema()

root
 |-- income: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



### We see the actual values and the obtained predictions and the probabilities associated with the predicted values

In [None]:
selected = predictions.select("income", "prediction", "probability")
selected.show(20)

+------+----------+--------------------+
|income|prediction|         probability|
+------+----------+--------------------+
|   0.0|       0.0|[0.89266953321816...|
|   0.0|       0.0|[0.90112462964047...|
|   0.0|       0.0|[0.87226367190461...|
|   0.0|       0.0|[0.94709055173955...|
|   0.0|       0.0|[0.92235587448879...|
|   0.0|       0.0|[0.75499160720068...|
|   0.0|       0.0|[0.64682247958949...|
|   0.0|       0.0|[0.89693809880161...|
|   0.0|       0.0|[0.88648295030040...|
|   0.0|       1.0|[0.44112817660515...|
|   0.0|       0.0|[0.88906834120282...|
|   0.0|       0.0|[0.88716899294267...|
|   0.0|       0.0|[0.84065909307684...|
|   0.0|       0.0|[0.87941782877909...|
|   0.0|       0.0|[0.85191091812879...|
|   0.0|       0.0|[0.68702019613462...|
|   0.0|       0.0|[0.86161527231472...|
|   0.0|       0.0|[0.82374352560222...|
|   0.0|       0.0|[0.59278596452410...|
|   0.0|       0.0|[0.55667153466019...|
+------+----------+--------------------+
only showing top

### Check the number of instances of a class in the label and the prediction.

In [None]:
cm = predictions.select("income", "prediction")	
cm.groupby('income').agg({'income': 'count'}).show()

+------+-------------+
|income|count(income)|
+------+-------------+
|   0.0|         7331|
|   1.0|         2327|
+------+-------------+



In [None]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()	

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             8786|
|       1.0|              872|
+----------+-----------------+



### The accuracy of a machine learning classification algorithm is one way to measure how often the algorithm classifies a data point correctly. We can compute the accuracy by computing the count when the label are correctly classified over the total number of rows.

In [None]:
cm.filter(cm.income == cm.prediction).count() / cm.count()

0.8153862083247049

### We can wrap previous steps in a function

In [None]:
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("income", "prediction")
    acc = cm.filter(cm.income == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 

accuracy_m(model = linearModel)

Model accuracy: 81.539%


## ROC metrics
### The Receiver Operating Characteristic curve is another common tool used with binary classification. It is very similar to the precision/recall curve, but instead of plotting precision versus recall, the ROC curve shows the true positive rate (i.e. recall) against the false positive rate. The false positive rate is the ratio of negative instances that are incorrectly classified as positive. It is equal to one minus the true negative rate. The true negative rate is also called specificity. Hence the ROC curve plots sensitivity (recall) versus 1 - specificity

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
#change income column name to label
predictions = predictions.withColumnRenamed("income", "label")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8900027005897135
areaUnderROC


## Tuning hyperparameters
### Hyperparameter optimization or tuning is the problem of choosing a set of optimal hyperparameters for a learning algorithm. A hyperparameter is a parameter whose value is used to control the learning process. By contrast, the values of other parameters are learned. The same kind of machine learning model can require different constraints, weights or learning rates to generalize different data patterns.

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

from time import *

start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, 
                    numFolds=5)
train_data = train_data.withColumn("label", col("income"))
# Run cross validations
cvModel = cv.fit(train_data)
end_time = time()
elapsed_time = end_time - start_time
print(f"Time to train model: {elapsed_time} seconds")

Time to train model: 42.0366473197937 seconds


### We see the accuracy of the model with optimal hyperparameters which is a bit better (by 3%).

In [None]:
accuracy_m(model = cvModel)

Model accuracy: 84.479%


### We can see all of the parameters of the model with optimal hyperparameters

In [None]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_edc4546ae281', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_edc4546ae281', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_edc4546ae281', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_edc4546ae281', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_edc4546ae281', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_edc4546ae281', name='labelCol', doc='label column name.'): 'income',
 Param(parent='LogisticRegression_edc4546ae281', name='maxIter', doc='max number of iterations (>= 0).')