In [1]:
import sqlite3
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
import pyspark.ml.feature as ft

from pyspark.sql.functions import isnan, when, count, col

In [2]:
with sqlite3.connect('database.sqlite') as con:
    detailed_matches = pd.read_sql_query("""SELECT Match.id,
                                        Country.name AS country_name,
                                        League.name AS league_name, 
                                        season, 
                                        stage, 
                                        date,
                                        HT.team_long_name AS  home_team,
                                        AT.team_long_name AS away_team,
                                        home_team_goal, 
                                        away_team_goal,
                                        goal,
                                        shoton,
                                        shotoff,
                                        foulcommit,
                                        card,
                                        cross,
                                        corner,
                                        possession,
                                        B365H as betting_odds_home,
                                        B365D as betting_odds_draw,
                                        B365A as betting_odds_away
                                FROM Match
                                JOIN Country on Country.id = Match.country_id
                                JOIN League on League.id = Match.league_id
                                LEFT JOIN Team AS HT on HT.team_api_id = Match.home_team_api_id
                                LEFT JOIN Team AS AT on AT.team_api_id = Match.away_team_api_id
                                ORDER by date""", con)

In [3]:
detailed_matches.to_csv("detailed_matches.csv")

# data ingestion as parquet with spark

In [4]:
spark = SparkSession.builder.master('local[4]').config("spark.executor.memory", "1g").config("spark.driver.memory", "2g").appName('spark_ml_soccer2').getOrCreate()
sc = spark.sparkContext

In [5]:
detailed_matches_sparkDF = spark.read.options(header=True, nullValue='NA', inferSchema=True).csv('./detailed_matches.csv')
detailed_matches_sparkDF.show()

+---+-----+------------+--------------------+---------+-----+-------------------+--------------------+--------------------+--------------+--------------+----+------+-------+----------+----+-----+------+----------+-----------------+-----------------+-----------------+
|_c0|   id|country_name|         league_name|   season|stage|               date|           home_team|           away_team|home_team_goal|away_team_goal|goal|shoton|shotoff|foulcommit|card|cross|corner|possession|betting_odds_home|betting_odds_draw|betting_odds_away|
+---+-----+------------+--------------------+---------+-----+-------------------+--------------------+--------------------+--------------+--------------+----+------+-------+----------+----+-----+------+----------+-----------------+-----------------+-----------------+
|  0|24559| Switzerland|Switzerland Super...|2008/2009|    1|2008-07-18 00:00:00|      BSC Young Boys|            FC Basel|             1|             2|null|  null|   null|      null|null| null| 

In [6]:
detailed_matches_sparkDF = spark.read.options(header=True, nullValue='NA', inferSchema=True).csv('./detailed_matches.csv')
detailed_matches_sparkDF.show()

+---+-----+------------+--------------------+---------+-----+-------------------+--------------------+--------------------+--------------+--------------+----+------+-------+----------+----+-----+------+----------+-----------------+-----------------+-----------------+
|_c0|   id|country_name|         league_name|   season|stage|               date|           home_team|           away_team|home_team_goal|away_team_goal|goal|shoton|shotoff|foulcommit|card|cross|corner|possession|betting_odds_home|betting_odds_draw|betting_odds_away|
+---+-----+------------+--------------------+---------+-----+-------------------+--------------------+--------------------+--------------+--------------+----+------+-------+----------+----+-----+------+----------+-----------------+-----------------+-----------------+
|  0|24559| Switzerland|Switzerland Super...|2008/2009|    1|2008-07-18 00:00:00|      BSC Young Boys|            FC Basel|             1|             2|null|  null|   null|      null|null| null| 

In [7]:
#write parquet
detailed_matches_sparkDF.write.mode("overwrite").parquet("detailed_matches.parquet")

In [8]:
parquetDF = spark.read.parquet("detailed_matches.parquet")

In [9]:
parquetDF.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- id: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- season: string (nullable = true)
 |-- stage: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- home_team: string (nullable = true)
 |-- away_team: string (nullable = true)
 |-- home_team_goal: integer (nullable = true)
 |-- away_team_goal: integer (nullable = true)
 |-- goal: string (nullable = true)
 |-- shoton: string (nullable = true)
 |-- shotoff: string (nullable = true)
 |-- foulcommit: string (nullable = true)
 |-- card: string (nullable = true)
 |-- cross: string (nullable = true)
 |-- corner: string (nullable = true)
 |-- possession: string (nullable = true)
 |-- betting_odds_home: double (nullable = true)
 |-- betting_odds_draw: double (nullable = true)
 |-- betting_odds_away: double (nullable = true)



In [10]:
parquetDF.write.partitionBy("season").mode("overwrite").parquet('matchBySeason.parquet')

By partitioning the file by season, I broke up the match data, by season, into seperate files we can then load and use in python for analysis. 


This is what the partitioned file looks like:

In [13]:
parDF=spark.read.parquet("matchBySeason.parquet/season=2015%2F2016")
parDF.show()

+-----+-----+------------+--------------------+-----+-------------------+--------------------+--------------------+--------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+-----------------+-----------------+
|  _c0|   id|country_name|         league_name|stage|               date|           home_team|           away_team|home_team_goal|away_team_goal|                goal|              shoton|             shotoff|          foulcommit|                card|               cross|              corner|          possession|betting_odds_home|betting_odds_draw|betting_odds_away|
+-----+-----+------------+--------------------+-----+-------------------+--------------------+--------------------+--------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------