# Análisis de Datos - Ejercicio
Autor: Raúl Baeza Osuna


## Enunciado
- Elige un dataset futbol.csv o covid.csv
	- Covid
		https://www.ecdc.europa.eu/en/publications-data/download-todays-data-geographic-distribution-covid-19-cases-worldwide
	- Futbol https://rstudio-pubs-static.s3.amazonaws.com/338127_f85723bf6d9b4637a4c5e1ff8f838432.html
		https://www.football-data.co.uk/spainm.php
		https://www.football-data.co.uk/notes.txt
- Crea un dataframe de Spark cargando los datos
- Comprueba el esquema
- Plantea 3 cuestiones análiticas de resumen de datos.
	- Ej. ¿cuantos goles se han marcado partidos como visitante?
	- Ej. ¿cuanto casos de covid había aculados en España en marzo de 2021?
	- Contesta al menos 2 de ellas usando funciones de la API de Dataframes y otra usando sentencias SQL.

- Persiste el resultado en formato Parquet.
- Documenta el ejercicio usando lenguaje Markdown.
- Envía el notebook al profesor en formato .ipynb y en .html

## Iniciar Spark

In [157]:
import findspark
findspark.init()

In [158]:
from pyspark.sql import SparkSession

In [159]:
spark: SparkSession = SparkSession.builder.appName('IntroSparkSQL')\
    .config('spark.sql.repl.eagerEval.enabled',True)\
    .config('spark.sql.repl.eagerEval.maxNumRows', 10)\
    .getOrCreate()

In [160]:
spark

## Dataset seleccionado: COVID-19

En este trabajo, se realizará un análisis de datos utilizando un dataset de COVID-19. El dataset contiene información sobre la distribución geográfica de los casos de COVID-19 a nivel mundial.

El objetivo del análisis es responder a tres preguntas analíticas basadas en los datos del dataset. Se utilizará la API de DataFrames y sentencias SQL para obtener las respuestas. Las preguntas planteadas son las siguientes:

1. ¿Cuántos casos de COVID-19 se han registrado en total en todo el mundo?
2. ¿Cuál es el país con el mayor número de casos acumulados de COVID-19?
3. ¿Cuál es la fecha con el mayor número de casos diarios de COVID-19 en España?

Se cargará el dataset en un dataframe de Spark, se comprobará el esquema y se realizará el análisis de datos para responder a las preguntas planteadas. Finalmente, los resultados se guardarán en formato Parquet para su posterior uso.

Autor: Raúl Baeza Osuna


## Crear Data Frame y Cargado de datos



In [161]:
pathCov="file:////home/training/ejercicios/covid/"

In [162]:
df_cov = spark.read.csv(pathCov+'covid.csv', header = True,inferSchema=True)

In [163]:
df_cov

dateRep,day,month,year,cases,deaths,countriesAndTerritories,geoId,countryterritoryCode,popData2018
29/03/2020,29,3,2020,15,1,Afghanistan,AF,AFG,37172386
28/03/2020,28,3,2020,16,1,Afghanistan,AF,AFG,37172386
27/03/2020,27,3,2020,0,0,Afghanistan,AF,AFG,37172386
26/03/2020,26,3,2020,33,0,Afghanistan,AF,AFG,37172386
25/03/2020,25,3,2020,2,0,Afghanistan,AF,AFG,37172386
24/03/2020,24,3,2020,6,1,Afghanistan,AF,AFG,37172386
23/03/2020,23,3,2020,10,0,Afghanistan,AF,AFG,37172386
22/03/2020,22,3,2020,0,0,Afghanistan,AF,AFG,37172386
21/03/2020,21,3,2020,2,0,Afghanistan,AF,AFG,37172386
20/03/2020,20,3,2020,0,0,Afghanistan,AF,AFG,37172386


## Compruebación de esquema

In [164]:
df_cov.printSchema()

root
 |-- dateRep: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- countriesAndTerritories: string (nullable = true)
 |-- geoId: string (nullable = true)
 |-- countryterritoryCode: string (nullable = true)
 |-- popData2018: integer (nullable = true)



## Preguntas Covid

### 1. ¿Cuántos casos de COVID-19 se han registrado en total en todo el mundo?
Respuesta con funciones de la API de Dataframes

In [165]:
from pyspark.sql.functions import  count,sum

#### Agrupado por Año

In [166]:
df_cov11 =df_cov.groupBy('year').agg(count('*').alias('Count'))

In [167]:
df_cov11

year,Count
2019,67
2020,7448


#### Exportación a csv y parquet

In [168]:

df_cov11.write.format('parquet').mode('overwrite').save(pathCov+'resultado/parquet11')
df_cov11.write.format('csv').mode('overwrite').save(pathCov+'resultado/csv11')

                                                                                

In [169]:
pruebaP = spark.read.format('parquet') \
    .load(pathCov+'resultado/parquet11')
print (pruebaP)
pruebaCsv = spark.read.format('csv') \
    .load(pathCov+'resultado/csv11')
print (pruebaCsv)

+----+-----+
|year|Count|
+----+-----+
|2020| 7448|
|2019|   67|
+----+-----+

+----+----+
| _c0| _c1|
+----+----+
|2020|7448|
|2019|  67|
+----+----+



#### Agrupado por territorios y paises

In [170]:
df_cov12=df_cov.groupBy('countriesAndTerritories').agg(sum('cases').alias('suma'))

In [171]:
df_cov12

countriesAndTerritories,suma
Chad,5
Anguilla,2
Paraguay,59
Russia,1264
Burkina_Faso,180
Cases_on_an_inter...,696
Senegal,130
Sweden,3447
Timor_Leste,1
Guyana,8


#### Exportación a csv y parquet

In [172]:
df_cov12.write.format('parquet').mode('overwrite').save(pathCov+'resultado/parquet12')
df_cov12.write.format('csv').mode('overwrite').save(pathCov+'resultado/csv12')

                                                                                

In [173]:
pruebaP = spark.read.format('parquet') \
    .load(pathCov+'resultado/parquet12')
print (pruebaP)
pruebaCsv = spark.read.format('csv') \
    .load(pathCov+'resultado/csv12')
print (pruebaCsv)

+-----------------------+----+
|countriesAndTerritories|suma|
+-----------------------+----+
|   United_Republic_o...|  13|
|   British_Virgin_Is...|   2|
|   Saint_Kitts_and_N...|   2|
|    Antigua_and_Barbuda|   7|
|    Trinidad_and_Tobago|  74|
|      Equatorial_Guinea|  13|
|   Cases_on_an_inter...| 696|
|                Senegal| 130|
|         Czech_Republic|2663|
|                Somalia|   3|
+-----------------------+----+
only showing top 10 rows

+--------------------+----+
|                 _c0| _c1|
+--------------------+----+
|             Austria|8291|
|  Dominican_Republic| 719|
|    French_Polynesia|  34|
|               Nepal|   5|
|          Azerbaijan| 182|
|       Guinea_Bissau|   2|
|               Libya|   1|
|United_Arab_Emirates| 468|
|      Czech_Republic|2663|
|             Somalia|   3|
+--------------------+----+
only showing top 10 rows



#### Exportación a csv y parquet

#### Total Global

In [174]:
total_sum = df_cov.agg(sum('cases').alias('total'))


In [175]:
total_sum

total
657140


In [176]:
df_cov13=total_sum

In [177]:
type(df_cov13)

pyspark.sql.dataframe.DataFrame

#### Exportación a csv y parquet

In [178]:
df_cov13.write.format('parquet').mode('overwrite').save(pathCov+'resultado/parquet13')
df_cov13.write.format('csv').mode('overwrite').save(pathCov+'resultado/csv13')

In [179]:
pruebaP = spark.read.format('parquet') \
    .load(pathCov+'resultado/parquet13')
print (pruebaP)
pruebaCsv = spark.read.format('csv') \
    .load(pathCov+'resultado/csv13')
print (pruebaCsv)

+------+
| total|
+------+
|657140|
+------+

+------+
|   _c0|
+------+
|657140|
+------+



### 2. ¿Cuál es el país con el mayor número de casos acumulados de COVID-19?
Respuesta con funciones de la API de Dataframes


In [180]:
from pyspark.sql.functions import  max

In [181]:
df_cov2=df_cov.groupBy('countriesAndTerritories').agg( max('cases').alias('MaxCases')).orderBy('MaxCases',ascending=False).limit(1)

In [182]:
df_cov2

countriesAndTerritories,MaxCases
United_States_of_...,19979


#### Exportación a csv y parquet

In [183]:
df_cov2.write.format('parquet').mode('overwrite').save(pathCov+'resultado/parquet2')
df_cov2.write.format('csv').mode('overwrite').save(pathCov+'resultado/csv2')

In [184]:
pruebaP = spark.read.format('parquet') \
    .load(pathCov+'resultado/parquet2')
print (pruebaP)
pruebaCsv = spark.read.format('csv') \
    .load(pathCov+'resultado/csv2')
print (pruebaCsv)

+-----------------------+--------+
|countriesAndTerritories|MaxCases|
+-----------------------+--------+
|   United_States_of_...|   19979|
+-----------------------+--------+

+--------------------+-----+
|                 _c0|  _c1|
+--------------------+-----+
|United_States_of_...|19979|
+--------------------+-----+



In [185]:
from pyspark.sql import Window
from pyspark.sql.functions import max, col

windowSpec = Window.orderBy(col('cases').desc())

max_cases_country = df_cov.select('countriesAndTerritories', 'cases', max('cases').over(windowSpec).alias('MaxCases')) \
                         .filter(col('MaxCases').isNotNull()) \
                         .limit(1)

max_cases_country.show()


+-----------------------+-----+--------+
|countriesAndTerritories|cases|MaxCases|
+-----------------------+-----+--------+
|   United_States_of_...|19979|   19979|
+-----------------------+-----+--------+



### 3. ¿Cuál es la fecha con el mayor número de casos diarios de COVID-19 en España?
Respuesta SQL

In [186]:
df_cov.createOrReplaceTempView('covidSql')
df_cov31 = spark.sql("""
    select * from covidSql
    where countriesAndTerritories= 'Spain' and cases in  (select  max (cases) 
    from covidSql
    where countriesAndTerritories= 'Spain'
    group by countriesAndTerritories)
    
""")

In [187]:
df_cov31.show()

+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+
|   dateRep|day|month|year|cases|deaths|countriesAndTerritories|geoId|countryterritoryCode|popData2018|
+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+
|27/03/2020| 27|    3|2020| 8578|   655|                  Spain|   ES|                 ESP|   46723749|
+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+



#### Exportación a csv y parquet

In [188]:
df_cov31.write.format('parquet').mode('overwrite').save(pathCov+'resultado/parquet3')
df_cov31.write.format('csv').mode('overwrite').save(pathCov+'resultado/csv3')

In [189]:
pruebaP = spark.read.format('parquet') \
    .load(pathCov+'resultado/parquet3')
print (pruebaP)
pruebaCsv = spark.read.format('csv') \
    .load(pathCov+'resultado/csv3')
print (pruebaCsv)

+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+
|   dateRep|day|month|year|cases|deaths|countriesAndTerritories|geoId|countryterritoryCode|popData2018|
+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+
|27/03/2020| 27|    3|2020| 8578|   655|                  Spain|   ES|                 ESP|   46723749|
+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+

+----------+---+---+----+----+---+-----+---+---+--------+
|       _c0|_c1|_c2| _c3| _c4|_c5|  _c6|_c7|_c8|     _c9|
+----------+---+---+----+----+---+-----+---+---+--------+
|27/03/2020| 27|  3|2020|8578|655|Spain| ES|ESP|46723749|
+----------+---+---+----+----+---+-----+---+---+--------+



## Apagar spark

In [190]:
spark.stop()