In [16]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from jinja2 import Template

In [2]:
spark = SparkSession.builder \
            .master("local[*]") \
            .appName("ingestion") \
            .getOrCreate()

In [147]:
df_pandas = ""
schema = ""

## Create Schema of Table


#### Define path of file csv

In [158]:
tables_path = {
        'appearances' : "transfermarkt/appearances.csv",
        'club_games' : "transfermarkt/club_games.csv",
        'clubs' : "transfermarkt/clubs.csv",
        'competitions' : "transfermarkt/competitions.csv",
        'game_events' : "transfermarkt/game_events.csv",
        'game_lineups' : "transfermarkt/game_lineups.csv",
        'games' : "transfermarkt/games.csv",
        'player_valuations' : "transfermarkt/player_valuations.csv",
        'players' : "transfermarkt/players.csv"
    }

#### Generate StructType from Spark

In [6]:
df_pandas = spark.read \
                .option("header", "true") \
                .csv(path=tables_path["appearances"])

spark.createDataFrame(df_pandas).schema

StructType([StructField('appearance_id', StringType(), True), StructField('game_id', LongType(), True), StructField('player_id', LongType(), True), StructField('player_club_id', LongType(), True), StructField('player_current_club_id', LongType(), True), StructField('date', StringType(), True), StructField('player_name', StringType(), True), StructField('competition_id', StringType(), True), StructField('yellow_cards', LongType(), True), StructField('red_cards', LongType(), True), StructField('goals', LongType(), True), StructField('assists', LongType(), True), StructField('minutes_played', LongType(), True)])

##### Rewrite schema type 
```python
types.StructType([
    types.StructField('appearance_id', types.StringType(), True), 
    types.StructField('game_id', types.LongType(), True), 
    types.StructField('player_id', types.LongType(), True), 
    types.StructField('player_club_id', types.LongType(), True), 
    types.StructField('player_current_club_id', types.LongType(), True), 
    types.StructField('date', types.StringType(), True), 
    types.StructField('player_name', types.StringType(), True), 
    types.StructField('competition_id', types.StringType(), True), 
    types.StructField('yellow_cards', types.LongType(), True), 
    types.StructField('red_cards', types.LongType(), True), 
    types.StructField('goals', types.LongType(), True), 
    types.StructField('assists', types.LongType(), True), 
    types.StructField('minutes_played', types.LongType(), True)
]) 
```

In [140]:
schema = types.StructType([
    types.StructField('appearance_id', types.StringType(), True), 
    types.StructField('game_id', types.LongType(), True), 
    types.StructField('player_id', types.LongType(), True), 
    types.StructField('player_club_id', types.LongType(), True), 
    types.StructField('player_current_club_id', types.LongType(), True), 
    types.StructField('date', types.StringType(), True), 
    types.StructField('player_name', types.StringType(), True), 
    types.StructField('competition_id', types.StringType(), True), 
    types.StructField('yellow_cards', types.LongType(), True), 
    types.StructField('red_cards', types.LongType(), True), 
    types.StructField('goals', types.LongType(), True), 
    types.StructField('assists', types.LongType(), True), 
    types.StructField('minutes_played', types.LongType(), True)
])

df = spark.read \
        .option("header", "true") \
        .schema(schema) \
        .csv("transfermarkt/appearances.csv")

#### Generate Schema from CSV file

In [None]:
def generateSchemaFromCSV(spark, path_file):
    df_pandas = pd.read_csv(path_file, nrows=5) #  only read symbolic 5  rows to get the schema of csv file
    schema = spark.createDataFrame(df_pandas).schema
    return schema

In [56]:
def convert_type(data_type):
    if data_type == types.StringType():
        return 'VARCHAR(155)'
    elif data_type == types.LongType():
        return 'BIGINT'
    elif data_type == types.DoubleType():
        return 'DOUBLE'
    else:
        return 'UNKNOWN'

In [142]:

def generateStatementCreate(schema, table_name):

    template = Template('''
DROP TABLE IF EXISTS {{ table_name }};
CREATE TABLE {{ table_name  }} (
    {% for column in columns -%}
    {{ column.name }} {{ convert_type(column.dataType) }}{% if not loop.last %},{% endif %}
    {% endfor -%}
);
''')
    
    fields = schema.fields
    stm_create_table = template.render(columns=fields, convert_type=convert_type, table_name=table_name)
    return stm_create_table

In [None]:
schema = generateSchemaFromCSV(spark, tables_path['players'])
players = generateStatementCreate(schema=schema, table_name='players')
players

#### **Table players**
``` sql
DROP TABLE IF EXISTS players;
CREATE TABLE players (
     player_id BIGINT,
     first_name VARCHAR(155),
     last_name VARCHAR(155),
     name VARCHAR(155),
     last_season BIGINT,
     current_club_id BIGINT,
     player_code VARCHAR(155),
     country_of_birth VARCHAR(155),
     city_of_birth VARCHAR(155),
     country_of_citizenship VARCHAR(155),
     date_of_birth VARCHAR(155),
     sub_position VARCHAR(155),
     position VARCHAR(155),
     foot VARCHAR(155),
     height_in_cm DOUBLE,
     contract_expiration_date DOUBLE,
     agent_name VARCHAR(155),
     image_url VARCHAR(155),
     url VARCHAR(155),
     current_club_domestic_competition_id VARCHAR(155),
     current_club_name VARCHAR(155),
     market_value_in_eur BIGINT,
     highest_market_value_in_eur BIGINT
     ); 
```