# PySpark Basics

### Webscraping from GDELT page

Parse the daily csv file and the data header file.

- Daily data: http://data.gdeltproject.org/events/index.html
- Column data: https://www.gdeltproject.org/data/lookups/CSV.header.dailyupdates.txt

### Install beautifulsoup and write function to pull daily csv files

In [1]:
pip install beautifulsoup4

Note: you may need to restart the kernel to use updated packages.


In [2]:
url = 'http://data.gdeltproject.org/events/index.html'

import requests
import bs4

import sys 
sys.setrecursionlimit(100000) # Need to set higher recursion limit (Python default = 1000), as there are more child items

def get_page(url):
    web_pages = requests.get(url).text
    soup = bs4.BeautifulSoup(web_pages, 'html.parser')
    return soup

soup = get_page(url)

In [3]:
# Returns all file links listed on the GDELT page

def get_files(soup):
    return soup.find_all('a')

get_files(get_page(url))

[<a href="md5sums">md5sums</a>,
 <a href="filesizes">filesizes</a>,
 <a href="GDELT.MASTERREDUCEDV2.1979-2013.zip">GDELT.MASTERREDUCEDV2.1979-2013.zip</a>,
 <a href="20210626.export.CSV.zip">20210626.export.CSV.zip</a>,
 <a href="20210625.export.CSV.zip">20210625.export.CSV.zip</a>,
 <a href="20210624.export.CSV.zip">20210624.export.CSV.zip</a>,
 <a href="20210623.export.CSV.zip">20210623.export.CSV.zip</a>,
 <a href="20210622.export.CSV.zip">20210622.export.CSV.zip</a>,
 <a href="20210621.export.CSV.zip">20210621.export.CSV.zip</a>,
 <a href="20210620.export.CSV.zip">20210620.export.CSV.zip</a>,
 <a href="20210619.export.CSV.zip">20210619.export.CSV.zip</a>,
 <a href="20210618.export.CSV.zip">20210618.export.CSV.zip</a>,
 <a href="20210617.export.CSV.zip">20210617.export.CSV.zip</a>,
 <a href="20210616.export.CSV.zip">20210616.export.CSV.zip</a>,
 <a href="20210615.export.CSV.zip">20210615.export.CSV.zip</a>,
 <a href="20210614.export.CSV.zip">20210614.export.CSV.zip</a>,
 <a href="20

### File download
Set the frame (in days) for files you want to download:
- end_point: numeric, 0 represents the latest available file (usually yesterday's file as GDELT uploads data in daily batches)
- start_point: numeric, how many further historic files you would like to download (in daily steps)

In [4]:
# Specify the files you want to download 
end_point = 0
start_point = 20

In [5]:
# Creates Download Links 
all_links = soup.find_all('a',href = True)
all_links = all_links[3:]
urls = []
download_links = []
base_url = 'http://data.gdeltproject.org/events/'

for i in range(end_point,start_point):
    urls.append(all_links[i]['href'])
    download_links.append(base_url + urls[i])
    print(all_links[i]['href'])
    
download_links

20210626.export.CSV.zip
20210625.export.CSV.zip
20210624.export.CSV.zip
20210623.export.CSV.zip
20210622.export.CSV.zip
20210621.export.CSV.zip
20210620.export.CSV.zip
20210619.export.CSV.zip
20210618.export.CSV.zip
20210617.export.CSV.zip
20210616.export.CSV.zip
20210615.export.CSV.zip
20210614.export.CSV.zip
20210613.export.CSV.zip
20210612.export.CSV.zip
20210611.export.CSV.zip
20210610.export.CSV.zip
20210609.export.CSV.zip
20210608.export.CSV.zip
20210607.export.CSV.zip


['http://data.gdeltproject.org/events/20210626.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210625.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210624.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210623.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210622.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210621.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210620.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210619.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210618.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210617.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210616.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210615.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210614.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210613.export.CSV.zip',
 'http://data.gdeltproject.org/events/20210612.export.CSV.zip',
 'http://data.gdeltproject.org/events/20

### Dataframe creation

- First, we are compiling the list of columns for the pandas dataframe from this file: https://www.gdeltproject.org/data/lookups/CSV.header.dailyupdates.txt
- Then we create a dataframe from the daily GDELT files that were specified for download

In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [7]:
# Read tab-delimited csv file for data columns

columns = pd.read_csv('./Daily Conflict Data Columns.txt',sep='\t')
column_names = list(columns.columns)

### Spark Setup

- For processing the large volumes of data from many GDELT data files, distributed computing in Hadoop / Spark is required
- First, we will import the necessary libraries and start a Spark Session / Context

In [8]:
import pyspark

# Initiate Spark session
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName('./conflict_data_extract.csv') \
    .getOrCreate()

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType, FloatType

schema = StructType([ \
StructField("GLOBALEVENTID",StringType(),True), \
StructField("SQLDATE",DateType(),True), \
StructField("MonthYear",FloatType(),True), \
StructField("Year", IntegerType(), True), \
StructField("FractionDate", FloatType(), True), \
StructField("Actor1Code", StringType(), True), \
StructField("Actor1Name", StringType(),True), \
StructField("Actor1CountryCode", StringType(),True), \
StructField("Actor1KnownGroupCode", StringType(),True), \
                     
StructField("Actor1EthnicCode", StringType(), True), \
StructField("Actor1Religion1Code", StringType(), True), \
StructField("Actor1Religion2Code", StringType(), True), \
StructField("Actor1Type1Code",StringType(),True), \
StructField("Actor1Type2Code",StringType(),True), \
StructField("Actor1Type3Code",StringType(),True), \
StructField("Actor2Code", StringType(), True), \
StructField("Actor2Name", StringType(), True), \
StructField("Actor2CountryCode", StringType(), True), \
StructField("Actor2KnownGroupCode", StringType(), True), \

StructField("Actor2EthnicCode", StringType(), True), \
StructField("Actor2Religion1Code", StringType(), True), \
StructField("Actor2Religion2Code", StringType(), True), \
StructField("Actor2Type1Code",StringType(),True), \
StructField("Actor2Type2Code",StringType(),True), \
StructField("Actor2Type3Code",StringType(),True), \
StructField("IsRootEvent", FloatType(), True), \
StructField("EventCode", FloatType(), True), \
StructField("EventBaseCode", FloatType(), True), \
StructField("EventRootCode", FloatType(), True), \

StructField("QuadClass", FloatType(), True), \
StructField("GoldsteinScale", FloatType(), True), \
StructField("NumMentions", FloatType(), True), \
StructField("NumSources",FloatType(),True), \
StructField("NumArticles",FloatType(),True), \
StructField("AvgTone",FloatType(),True), \
StructField("Actor1Geo_Type", IntegerType(), True), \
StructField("Actor1Geo_FullName", StringType(), True), \
StructField("Actor1Geo_CountryCode", StringType(), True), \
StructField("Actor1Geo_ADM1Code", StringType(), True), \

StructField("Actor1Geo_Lat", FloatType(), True), \
StructField("Actor1Geo_Long", FloatType(), True), \
StructField("Actor1Geo_FeatureID", StringType(), True), \
StructField("Actor2Geo_Type",IntegerType(),True), \
StructField("Actor2Geo_FullName",StringType(),True), \
StructField("Actor2Geo_CountryCode",StringType(),True), \
StructField("Actor2Geo_ADM1Code", StringType(), True), \
StructField("Actor2Geo_Lat", FloatType(), True), \
StructField("Actor2Geo_Long", FloatType(), True), \
StructField("Actor2Geo_FeatureID", StringType(), True), \
  
StructField("ActionGeo_Type", FloatType(), True), \
StructField("ActionGeo_FullName", StringType(), True), \
StructField("ActionGeo_CountryCode", StringType(), True), \
StructField("ActionGeo_ADM1Code",StringType(),True), \
StructField("ActionGeo_Lat",FloatType(),True), \
StructField("ActionGeo_Long",FloatType(),True), \
StructField("ActionGeo_FeatureID", StringType(), True), \
StructField("DATEADDED", DateType(), True), \
StructField("SOURCEURL", StringType(), True)])


In [10]:
url = "http://data.gdeltproject.org/events/20210623.export.CSV.zip"

from pyspark import SparkFiles
spark.sparkContext.addFile(url)

df = spark.read.options(delimiter='\t').csv("file://"+SparkFiles.get("20210623.export.CSV.zip"), 
                                            header='false', schema = schema)

In [11]:
df.show()

+--------------------+-------+---------+----+------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+-----------+---------+-------------+-------------+---------+--------------+-----------+----------+-----------+-------+--------------+------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+------------------+---------------------+------------------+-------------+--------------+-------------------+---------+---------+
|       GLOBALEVENTID|SQLDATE|MonthYear|Year|FractionDate|Actor1Code|Actor1Name|Actor1CountryCode|A

### ISSUES and Next Steps: 
- Read csv file directly from URL into dataframe (works mostly, but data in df is shown as null)
- Set up iterator to load and concatenate all downloaded dataframes
- Use Lat-Long data to create geometry point and geo dataframe

### Dataframe Transformation and Exploration

In [90]:
# Create dataframe from local csv (already in correct shape)
df = spark.read.option("header",True) \
     .csv("./conflict_data_extract.csv")

In [91]:
df.select('Year','Actor1Code','Actor2Code','GoldsteinScale','geometry').show()

+----+----------+----------+--------------+--------------------+
|Year|Actor1Code|Actor2Code|GoldsteinScale|            geometry|
+----+----------+----------+--------------+--------------------+
|2020|       LEG|      null|           4.0|POINT (-89.0022 4...|
|2020|       USA|       CVL|           4.0|POINT (-87.650099...|
|2020|       USA|       CVL|          -9.5|POINT (-89.0022 4...|
|2021|      null|       CVL|         -10.0|POINT (69.1833 34...|
|2021|      null|       GBR|           1.9|   POINT (-0.2 52.9)|
|2021|      null|       GBR|           1.9|POINT (-3.16667 5...|
|2021|       BUS|       MED|           0.4|POINT (-0.116667 ...|
|2021|       GBR|      null|           2.8|   POINT (-0.2 52.9)|
|2021|       GBR|      null|           2.8|POINT (-3.16667 5...|
|2021|       GOV|       GBR|          -4.4|   POINT (-0.2 52.9)|
|2021|       GOV|       GBR|          -4.4|POINT (-3.16667 5...|
|2021|       GOV|       MED|           0.0|        POINT (2 46)|
|2021|       TWN|      nu

In [55]:
# Group by Actors based on maximum Goldstein value
from pyspark.sql import functions as F

df.groupBy(["Actor1Code","Actor2Code"]).agg(F.max("GoldsteinScale")).show()

+----------+----------+-------------------+
|Actor1Code|Actor2Code|max(GoldsteinScale)|
+----------+----------+-------------------+
|       AFG|       CVL|                7.0|
|    CANLAB|      null|               -4.4|
|       EGY|    EGYBUS|                2.8|
|       FJI|    GOVHLH|                2.8|
|       GOV|       BFA|                2.8|
|       NZL|       SPY|                2.8|
|       VAT|    CHRCTH|                0.0|
|      null|       ELI|                2.8|
|      null|       PSE|                3.5|
|       COP|       MED|                0.0|
|       CRM|    USAJUD|                0.0|
|       DEU|    DEUCVL|                5.0|
|       ELI|      null|                3.4|
|    FRAGOV|       GBR|                3.0|
|    GBRGOV|    GBRELI|                2.8|
|       GOV|       AFR|                3.4|
|       NZL|       CVL|                4.0|
|    PRKGOV|       LAB|                0.0|
|       PSE|      null|                7.4|
|       REB|       OPP|         

In [56]:
# Exemplary SQL query within Spark
from pyspark import SparkContext
from pyspark.sql import SQLContext


df.registerTempTable('conflict_table')
newDF = spark.sql('SELECT * FROM conflict_table WHERE GoldsteinScale>5')
newDF.show()

+---+-------------+--------+---------+----+------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+----------+--------------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+-----------+---------+-------------+-------------+---------+--------------+-----------+----------+-----------+-------------------+--------------+------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+---------------------+------------------+-------------+------------------+-------------------+--------------+--------------------+---------------------+------------------+-------------+------------------+-------------------+---------+--------------------+--------------------+
|_c0|GLOBALEVENTID| SQLDATE|MonthYear|Yea

In [57]:
df.count()

10000

In [58]:
# Checking for NA values
from pyspark.sql.functions import isnull, when, count, col
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+---+-------------+-------+---------+----+------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+-----------+---------+-------------+-------------+---------+--------------+-----------+----------+-----------+-------+--------------+------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+------------------+---------------------+------------------+-------------+--------------+-------------------+---------+---------+--------+
|_c0|GLOBALEVENTID|SQLDATE|MonthYear|Year|FractionDate|Actor1Code|Actor1Name|Actor1CountryCod

### Prepare Dataset for Model Creation

In [81]:
# Selecting mainly numerical features and few categorical ones (actors)
df_model = df.select('MonthYear','Actor1Code','Actor2Code','NumMentions','QuadClass','GoldsteinScale','AvgTone')
df_model.show()

+---------+----------+----------+-----------+---------+--------------+-------------------+
|MonthYear|Actor1Code|Actor2Code|NumMentions|QuadClass|GoldsteinScale|            AvgTone|
+---------+----------+----------+-----------+---------+--------------+-------------------+
|   202006|       LEG|      null|         25|        1|           4.0|-0.7479237991763841|
|   202006|       USA|       CVL|         50|        1|           4.0|-0.7479237991763841|
|   202006|       USA|       CVL|         50|        4|          -9.5|-0.7479237991763841|
|   202105|      null|       CVL|          5|        4|         -10.0|  -3.37423312883435|
|   202105|      null|       GBR|          8|        1|           1.9|  -2.12042405373999|
|   202105|      null|       GBR|          8|        1|           1.9|  -2.12042405373999|
|   202105|       BUS|       MED|          3|        1|           0.4|-2.6737967914438503|
|   202105|       GBR|      null|         12|        1|           2.8|  -2.12042405373999|

In [60]:
# Firstly, we will create a classifier model and thus need a categorical prediction target: 
# 0 - negative impact according to Goldstein Scale
# 1 - negative impact according to Goldstein Scale

df_model = df_model.withColumn("Impact", F.when((F.col('GoldsteinScale') > 0),1)\
                         .when((F.col('GoldsteinScale') < 0), 0))

In [61]:
# Replace NA values
df_model = df_model.replace('?', None)\
        .dropna(how='any')
df_model.show()

+---------+----------+----------+-----------+---------+--------------+-------------------+------+
|MonthYear|Actor1Code|Actor2Code|NumMentions|QuadClass|GoldsteinScale|            AvgTone|Impact|
+---------+----------+----------+-----------+---------+--------------+-------------------+------+
|   202006|       USA|       CVL|         50|        1|           4.0|-0.7479237991763841|     1|
|   202006|       USA|       CVL|         50|        4|          -9.5|-0.7479237991763841|     0|
|   202105|       GOV|       GBR|         12|        3|          -4.4|  -2.12042405373999|     0|
|   202105|       GOV|       GBR|        108|        3|          -4.4|  -1.27370711645759|     0|
|   202106|       BUS|       EDU|         10|        1|           5.2|   4.94505494505495|     1|
|   202106|       FRA|    MCOELI|          2|        1|           1.9|                0.0|     1|
|   202106|       GOV|       CAN|         55|        1|           7.0|  -2.13500471959119|     1|
|   202106|       GO

In [62]:
# Apply String Indexer: Transform Actor Codes into labels A1 and A2 (required for some classifiers)

from pyspark.ml.feature import StringIndexer

df_model = StringIndexer(
    inputCol='Actor1Code', 
    outputCol='A1', 
    handleInvalid='keep').fit(df_model).transform(df_model)

df_model = StringIndexer(
    inputCol='Actor2Code', 
    outputCol='A2', 
    handleInvalid='keep').fit(df_model).transform(df_model)

df_model.show()

+---------+----------+----------+-----------+---------+--------------+-------------------+------+-----+-----+
|MonthYear|Actor1Code|Actor2Code|NumMentions|QuadClass|GoldsteinScale|            AvgTone|Impact|   A1|   A2|
+---------+----------+----------+-----------+---------+--------------+-------------------+------+-----+-----+
|   202006|       USA|       CVL|         50|        1|           4.0|-0.7479237991763841|     1|  0.0|  2.0|
|   202006|       USA|       CVL|         50|        4|          -9.5|-0.7479237991763841|     0|  0.0|  2.0|
|   202105|       GOV|       GBR|         12|        3|          -4.4|  -2.12042405373999|     0|  1.0|  5.0|
|   202105|       GOV|       GBR|        108|        3|          -4.4|  -1.27370711645759|     0|  1.0|  5.0|
|   202106|       BUS|       EDU|         10|        1|           5.2|   4.94505494505495|     1|  9.0|  6.0|
|   202106|       FRA|    MCOELI|          2|        1|           1.9|                0.0|     1| 11.0|216.0|
|   202106

In [63]:
# Drop unnecessary columns
df_model = df_model.drop('Actor1Code')
df_model = df_model.drop('Actor2Code')
df_model = df_model.drop('GoldsteinScale')
df_model.show()

+---------+-----------+---------+-------------------+------+-----+-----+
|MonthYear|NumMentions|QuadClass|            AvgTone|Impact|   A1|   A2|
+---------+-----------+---------+-------------------+------+-----+-----+
|   202006|         50|        1|-0.7479237991763841|     1|  0.0|  2.0|
|   202006|         50|        4|-0.7479237991763841|     0|  0.0|  2.0|
|   202105|         12|        3|  -2.12042405373999|     0|  1.0|  5.0|
|   202105|        108|        3|  -1.27370711645759|     0|  1.0|  5.0|
|   202106|         10|        1|   4.94505494505495|     1|  9.0|  6.0|
|   202106|          2|        1|                0.0|     1| 11.0|216.0|
|   202106|         55|        1|  -2.13500471959119|     1|  1.0| 18.0|
|   202106|          1|        3|-2.8985507246376803|     0|  1.0|139.0|
|   202106|         14|        1|  -2.13500471959119|     1|  1.0| 21.0|
|   202106|         56|        1|  -2.13500471959119|     1|  1.0| 21.0|
|   202106|         14|        1|  -2.1350047195911

In [64]:
# Transform strings into numerical values (although defined in schema? Check again)
from pyspark.sql.functions import col

df_model = df_model.select(col('MonthYear').cast('float'),
                         col('NumMentions').cast('float'),
                         col('QuadClass').cast('float'),
                         col('AvgTone').cast('float'),
                         col('Impact'),
                         col('A1'),
                         col('A2'),
                        )
df_model.show()

+---------+-----------+---------+----------+------+-----+-----+
|MonthYear|NumMentions|QuadClass|   AvgTone|Impact|   A1|   A2|
+---------+-----------+---------+----------+------+-----+-----+
| 202006.0|       50.0|      1.0|-0.7479238|     1|  0.0|  2.0|
| 202006.0|       50.0|      4.0|-0.7479238|     0|  0.0|  2.0|
| 202105.0|       12.0|      3.0| -2.120424|     0|  1.0|  5.0|
| 202105.0|      108.0|      3.0|-1.2737072|     0|  1.0|  5.0|
| 202106.0|       10.0|      1.0|  4.945055|     1|  9.0|  6.0|
| 202106.0|        2.0|      1.0|       0.0|     1| 11.0|216.0|
| 202106.0|       55.0|      1.0|-2.1350048|     1|  1.0| 18.0|
| 202106.0|        1.0|      3.0|-2.8985507|     0|  1.0|139.0|
| 202106.0|       14.0|      1.0|-2.1350048|     1|  1.0| 21.0|
| 202106.0|       56.0|      1.0|-2.1350048|     1|  1.0| 21.0|
| 202106.0|       14.0|      1.0|-2.1350048|     1|  1.0| 21.0|
| 202106.0|       56.0|      1.0|-2.1350048|     1|  1.0| 21.0|
| 202106.0|        1.0|      3.0|-2.8985

In [73]:
# Spark actually works to predict with a column with all the features smashed together into a list-like structure
# If you want to predict “Impact”, you need to combine the information of the columns “MonthYear”, “NumMentions”, “QuadClass”, “AvgTone”, "A1" and "A2" into one column.
# With Label Indexer, A1 and A2 have too many categories to be effectively fit onto a decision tree classifier

# Assemble all the features with VectorAssembler
required_features = ['MonthYear',
                    'NumMentions',
                    'QuadClass',
                    'AvgTone',
                   ]

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=required_features, outputCol='features')
transformed_data = assembler.transform(df_model)

Exception ignored in: <function JavaWrapper.__del__ at 0x7fc373b91430>
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/ml/wrapper.py", line 39, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'RandomForestClassifier' object has no attribute '_java_obj'


In [74]:
transformed_data.show(5)

+---------+-----------+---------+----------+------+---+---+--------------------+
|MonthYear|NumMentions|QuadClass|   AvgTone|Impact| A1| A2|            features|
+---------+-----------+---------+----------+------+---+---+--------------------+
| 202006.0|       50.0|      1.0|-0.7479238|     1|0.0|2.0|[202006.0,50.0,1....|
| 202006.0|       50.0|      4.0|-0.7479238|     0|0.0|2.0|[202006.0,50.0,4....|
| 202105.0|       12.0|      3.0| -2.120424|     0|1.0|5.0|[202105.0,12.0,3....|
| 202105.0|      108.0|      3.0|-1.2737072|     0|1.0|5.0|[202105.0,108.0,3...|
| 202106.0|       10.0|      1.0|  4.945055|     1|9.0|6.0|[202106.0,10.0,1....|
+---------+-----------+---------+----------+------+---+---+--------------------+
only showing top 5 rows



### Modelling - Random Forest Classifier

In [75]:
# Train-Test Split
(training_data, test_data) = transformed_data.randomSplit([0.8,0.2])

In [76]:
# Fit RandomForest Classifier
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='Impact', 
                            featuresCol='features',
                            maxDepth=5,
                           maxBins = 32)

In [77]:
model = rf.fit(training_data)

In [78]:
predictions = model.transform(test_data)
predictions.show(3)

+---------+-----------+---------+----------+------+-----+----+--------------------+--------------------+--------------------+----------+
|MonthYear|NumMentions|QuadClass|   AvgTone|Impact|   A1|  A2|            features|       rawPrediction|         probability|prediction|
+---------+-----------+---------+----------+------+-----+----+--------------------+--------------------+--------------------+----------+
| 202006.0|        2.0|      1.0|-1.0416666|     1|111.0|18.0|[202006.0,2.0,1.0...|[0.40941026265029...|[0.02047051313251...|       1.0|
| 202006.0|       16.0|      1.0| 5.0675673|     1|  4.0|61.0|[202006.0,16.0,1....|[0.23085201088695...|[0.01154260054434...|       1.0|
| 202105.0|        4.0|      4.0|-12.328767|     0|  0.0|21.0|[202105.0,4.0,4.0...|[17.5615245009074...|[0.87807622504537...|       0.0|
+---------+-----------+---------+----------+------+-----+----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



### Model Evaluation

In [79]:
# Evaluate our model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='Impact', 
    predictionCol='prediction', 
    metricName='accuracy')

In [80]:
accuracy = evaluator.evaluate(predictions)
print('Test Accuracy = ', accuracy)

Test Accuracy =  0.9716088328075709


### Deep Learning in Spark
https://towardsdatascience.com/deep-learning-with-apache-spark-part-1-6d397c16abd