# Extract and Clean Data from all_tournaments.csv

Primero creamos un Spark Context

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import  (StructType, StructField, DateType, BooleanType, DoubleType, IntegerType, StringType, TimestampType)
from pyspark.sql.functions import col, udf
import os

spark = SparkSession.builder.master("local[1]").appName("tenis-matches").getOrCreate()

In [31]:
raw_file_path = os.path.join(os.path.abspath(os.path.pardir), "dataset", "raw", "all_tournaments.csv")
tor = spark.read.csv("file:///" + raw_file_path, header = True)

In [32]:
print("There are {} tournaments.".format(tor.count()))

There are 36488 tournaments.


### Eliminaremos algunas columnas del dataset que no nos sirven

In [33]:
tor.printSchema()

root
 |-- year: string (nullable = true)
 |-- tournament: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- location: string (nullable = true)
 |-- court_surface: string (nullable = true)
 |-- prize_money: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- masters: string (nullable = true)



Eliminaremos las siguientes columnas que no serán útiles para nuestro análisis:
- prize_money
- currency
- masters

In [34]:
tor = tor.drop("prize_money").drop("currency").drop("masters")

In [35]:
tor.printSchema()

root
 |-- year: string (nullable = true)
 |-- tournament: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- location: string (nullable = true)
 |-- court_surface: string (nullable = true)



No eliminaremos torneos con valores nulos, ya que puede generar inconsistencias con los otros datasets.

Eliminaremos las " de todos los nombres

In [36]:
from pyspark.sql.functions import asc, desc

tor.select("tournament").orderBy(desc("tournament")).show(10, False)

+------------------------------------------------+
|tournament                                      |
+------------------------------------------------+
||tf-junior-circuit---group-4_itf_juniors1       |
||tf-junior-circuit---group-4_itf_juniors0       |
||tf-junior-circuit---colombo-week-2_itf_juniors1|
||tf-junior-circuit---colombo-week-2_itf_juniors0|
|zurich                                          |
|zurich                                          |
|ztk-junior-open_itf_juniors1                    |
|ztk-junior-open_itf_juniors1                    |
|ztk-junior-open_itf_juniors1                    |
|ztk-junior-open_itf_juniors1                    |
+------------------------------------------------+
only showing top 10 rows



In [37]:
def removeCharacter(df, column, char):
    removeFn = udf(lambda x: x.replace(char, ''), StringType())
    return df.withColumn(column, removeFn(col(column)))

In [38]:
tor = removeCharacter(tor, "tournament", '"')
tor.select("tournament").orderBy(desc("tournament")).show(10, False)

+------------------------------------------------+
|tournament                                      |
+------------------------------------------------+
||tf-junior-circuit---group-4_itf_juniors1       |
||tf-junior-circuit---group-4_itf_juniors0       |
||tf-junior-circuit---colombo-week-2_itf_juniors1|
||tf-junior-circuit---colombo-week-2_itf_juniors0|
|zurich                                          |
|zurich                                          |
|ztk-junior-open_itf_juniors1                    |
|ztk-junior-open_itf_juniors1                    |
|ztk-junior-open_itf_juniors1                    |
|ztk-junior-open_itf_juniors1                    |
+------------------------------------------------+
only showing top 10 rows



Existen algunos torneos sin datos de finalizacion. Para estos, le pondremos como fecha de finalizacion la misma que de comienzo.

Primero cambiamos los tipos a Date

Luego cambiamos los start_date null a 1/1/año

In [43]:
from pyspark.sql.functions import col, unix_timestamp, to_date, when

tor = new_df.withColumn('start_date', to_date(unix_timestamp(col('start_date'), 'yyyy-mm-dd').cast("timestamp"))).withColumn('end_date', to_date(unix_timestamp(col('end_date'), 'yyyy-mm-dd').cast("timestamp")))
tor.printSchema()
tor.select("*").where(tor["start_date"].isNull()).show(10)

root
 |-- year: string (nullable = true)
 |-- tournament: string (nullable = true)
 |-- start_date: date (nullable = true)
 |-- end_date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- court_surface: string (nullable = true)

+----+----------+----------+--------+--------+-------------+
|year|tournament|start_date|end_date|location|court_surface|
+----+----------+----------+--------+--------+-------------+
|1974| las-vegas|      null|    null|     USA|         Clay|
|1974|     tokyo|      null|    null|   Japan|         Hard|
+----+----------+----------+--------+--------+-------------+



In [67]:
from pyspark.sql.functions import col, unix_timestamp, to_date, when, lit
import datetime

tor = tor.withColumn("start_date", when(col("start_date").isNull(), lit(datetime.datetime(1974, 1, 1))).otherwise(col("start_date")))
tor = tor.withColumn('start_date', to_date(unix_timestamp(col('start_date'), 'yyyy-mm-dd').cast("timestamp"))).withColumn('end_date', to_date(unix_timestamp(col('end_date'), 'yyyy-mm-dd').cast("timestamp")))

tor.select("*").where(tor["start_date"].isNull()).show(10)

+----+----------+----------+--------+--------+-------------+
|year|tournament|start_date|end_date|location|court_surface|
+----+----------+----------+--------+--------+-------------+
+----+----------+----------+--------+--------+-------------+



In [68]:
from pyspark.sql.functions import coalesce

tor = tor.withColumn("end_date", coalesce(tor["end_date"], tor["start_date"]))

In [69]:
tor.printSchema()

root
 |-- year: string (nullable = true)
 |-- tournament: string (nullable = true)
 |-- start_date: date (nullable = true)
 |-- end_date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- court_surface: string (nullable = true)



In [70]:
processed_file_path = os.path.join(os.path.abspath(os.path.pardir), "dataset", "processed", "all_tournaments.csv")
tor.write.format("csv").option("header", True).mode('overwrite').save("file:///" + processed_file_path)

## Creacion de archivo para ejecutar extraccion de datos

In [71]:
import os

get_processed_data_script_file = os.path.join(os.path.pardir, "process_all_tournaments.py")

In [78]:
%%writefile $get_processed_data_script_file

import os
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import  (StructType, StructField, DateType, BooleanType, DoubleType, IntegerType, StringType, TimestampType)
from pyspark.sql.functions import col, unix_timestamp, to_date, when, lit, udf, coalesce
import datetime

def extract_data():
    spark = SparkSession.builder.master("local[1]").appName("tenis-matches").getOrCreate()
    
    raw_file_path = os.path.join(os.path.abspath(os.path.pardir), "dataset", "raw", "all_tournaments.csv")
    tor = spark.read.csv("file:///" + raw_file_path, header = True)
    
    tor = tor.drop("prize_money").drop("currency").drop("masters")
    
    tor = removeCharacter(tor, "tournament", '"')
    
    tor = tor.withColumn('start_date', to_date(unix_timestamp(col('start_date'), 'yyyy-mm-dd').cast("timestamp"))).withColumn('end_date', to_date(unix_timestamp(col('end_date'), 'yyyy-mm-dd').cast("timestamp")))
    tor = tor.withColumn("start_date", when(col("start_date").isNull(), lit(datetime.datetime(1974, 1, 1))).otherwise(col("start_date")))
    tor = tor.withColumn('start_date', to_date(unix_timestamp(col('start_date'), 'yyyy-mm-dd').cast("timestamp"))).withColumn('end_date', to_date(unix_timestamp(col('end_date'), 'yyyy-mm-dd').cast("timestamp")))

    tor = tor.withColumn("end_date", coalesce(tor["end_date"], tor["start_date"]))
    
    return tor

def removeCharacter(df, column, char):
    removeFn = udf(lambda x: x.replace(char, ''), StringType())
    return df.withColumn(column, removeFn(col(column)))
    
if __name__ == '__main__':
    df = extract_data()
    processed_file_path = os.path.join(os.path.abspath(os.path.pardir), "dataset", "processed", "all_tournaments.csv")
    df.write.format("csv").option("header", True).mode('overwrite').save("file:///" + processed_file_path)

Overwriting ..\process_all_tournaments.py


In [79]:
!python $get_processed_data_script_file

The system cannot find the path specified.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2020-11-16 23:11:24,858 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2020-11-16 23:11:24,859 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.

[Stage 1:>                                                          (0 + 1) / 1]
                                                                                


SUCCESS: The process with PID 1392 (child process of PID 5880) has been terminated.
SUCCESS: The process with PID 5880 (child process of PID 14564) has been terminated.
SUCCESS: The process with PID 14564 (child process of PID 24176) has been terminated.
