## PySpark 
For PySpark, enter your Python code in the cell below. You can run the cell by hitting "Run" from the above toolbar. Example Python code: <br>

## Spark SQL
Enter %%sql in the first line and your SQL query in subsquent lines. You can run the query by hitting "Run" from the above toolbar. Example SQL query (you will need to replace &lt;database&gt; and &lt;table&gt; with your specific database and table): <br>


In [None]:
%%sql
CREATE DATABASE IF NOT EXISTS covid_db;

In [None]:
query = """
CREATE EXTERNAL TABLE IF NOT EXISTS covid_db.main_table (
    Province_State VARCHAR(255),
    Country_Region VARCHAR(255),
    Lat FLOAT,
    Long FLOAT,
    `2020_01_22` INT,
    `2020_01_23` INT,
    `2020_01_24` INT,
    `2020_01_25` INT,
    `2020_01_26` INT,
    `2020_01_27` INT,
    `2020_01_28` INT,
    `2020_01_29` INT,
    `2020_01_30` INT,
    `2020_01_31` INT,
    `2020_02_01` INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3://covid19-dataanalytics-mulder/data/';
"""
spark.sql(query)


In [None]:
spark.sql("USE covid_db")
spark.sql("SHOW TABLES").show()
spark.sql("DESCRIBE TABLE main_table").show()


## Se ve el esquema de la siguiente forma
Calculation completed.
+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
| covid_db|main_table|      false|
+---------+----------+-----------+

+--------------+------------+-------+
|      col_name|   data_type|comment|
+--------------+------------+-------+
|Province_State|varchar(255)|   null|
|Country_Region|varchar(255)|   null|
|           Lat|       float|   null|
|          Long|       float|   null|
|    2020_01_22|         int|   null|
|    2020_01_23|         int|   null|
|    2020_01_24|         int|   null|
|    2020_01_25|         int|   null|
|    2020_01_26|         int|   null|
|    2020_01_27|         int|   null|
|    2020_01_28|         int|   null|
|    2020_01_29|         int|   null|
|    2020_01_30|         int|   null|
|    2020_01_31|         int|   null|
|    2020_02_01|         int|   null|
+--------------+------------+-------+

In [None]:
df = spark.sql("SELECT * FROM main_table")
df.show()

In [None]:
df = spark.read \
    .option("header", "true") \
    .csv("s3a://covid19-dataanalytics-mulder/data/time_series_covid19_confirmed_global.csv")
df.show()

Agregar programaticamente las fechas 

In [None]:
from datetime import datetime, timedelta

# Generate date columns
start_date = datetime(2020, 1, 1)
end_date = datetime(2024, 12, 31)
date_generated = [start_date + timedelta(days=x) for x in range((end_date - start_date).days + 1)]
date_columns = [date.strftime("%Y_%m_%d") for date in date_generated]

In [None]:
from pyspark.sql.functions import explode, array, struct, lit, col


date_columns = [col_name for col_name in date_columns if col_name in df.columns]

data_cols = [struct(lit(c).alias("date"), col(c).alias("value")) for c in date_columns]

df_long = df.select("Province_State", "Country_Region", "Lat", "Long", explode(array(*data_cols)).alias("date_value"))

df_long = df_long.select("Province_State", "Country_Region", "Lat", "Long", "date_value.date", "date_value.value")



Calculation started (calculation_id=a2c69e51-28a4-7ae2-ae1b-b4c01bfc8df3) in (session=82c69e4f-12c9-fed7-290f-edab3b2d26cc). Checking calculation status...
Progress:   0%|          |elapsed time = 00:00s
Calculation a2c69e51-28a4-7ae2-ae1b-b4c01bfc8df3 failed


  File "<stdin>", line 8, in <module>
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1511, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: cannot resolve '2020_01_01' given input columns: [spark_catalog.covid_db.main_table.2020_01_22, spark_catalog.covid_db.main_table.2020_01_23, spark_catalog.covid_db.main_table.2020_01_24, spark_catalog.covid_db.main_table.2020_01_25, spark_catalog.covid_db.main_table.2020_01_26, spark_catalog.covid_db.main_table.2020_01_27, spark_catalog.covid_db.main_table.2020_01_28, spark_catalog.covid_db.main_table.2020_01_29, spark_catalog.covid_db.main_table.2020_01_30, spark_catalog.covid_db.main_table.2020_01_31, spark_catalog.covid_db.main_table.2020_02_01, spark_catalog.covid_db.main_table.Country_Region, spark_catalog.covid_db.main_table.Lat, spark_catalog.covid_db.main_table.Long, spark_catalog.covid_db.main_table.Province_State];
'Project [Province_State#73, Country_Region#74, Lat#75, Long#76, explode(array(struct(date, 2020_01_01, NamePlaceholder, '2020_01_01), struct(date, 2020_01_02, NamePlaceholder, '2020_01_02), struct(date, 2020_01_03, NamePlaceholder, '2020_01_03), struct(date, 2020_01_04, NamePlaceholder, '2020_01_04), struct(date, 2020_01_05, NamePlaceholder, '2020_01_05), struct(date, 2020_01_06, NamePlaceholder, '2020_01_06), struct(date, 2020_01_07, NamePlaceholder, '2020_01_07), struct(date, 2020_01_08, NamePlaceholder, '2020_01_08), struct(date, 2020_01_09, NamePlaceholder, '2020_01_09), struct(date, 2020_01_10, NamePlaceholder, '2020_01_10), struct(date, 2020_01_11, NamePlaceholder, '2020_01_11), struct(date, 2020_01_12, NamePlaceholder, '2020_01_12), struct(date, 2020_01_13, NamePlaceholder, '2020_01_13), struct(date, 2020_01_14, NamePlaceholder, '2020_01_14), struct(date, 2020_01_15, NamePlaceholder, '2020_01_15), struct(date, 2020_01_16, NamePlaceholder, '2020_01_16), struct(date, 2020_01_17, NamePlaceholder, '2020_01_17), struct(date, 2020_01_18, NamePlaceholder, '2020_01_18), struct(date, 2020_01_19, NamePlaceholder, '2020_01_19), struct(date, 2020_01_20, NamePlaceholder, '2020_01_20), struct(date, 2020_01_21, NamePlaceholder, '2020_01_21), struct(date, 2020_01_22, value, 2020_01_22#77), struct(date, 2020_01_23, value, 2020_01_23#78), struct(date, 2020_01_24, value, 2020_01_24#79), ... 1803 more fields)) AS date_value#3819]
+- GlobalLimit 10
   +- LocalLimit 10
      +- Project [Province_State#73, Country_Region#74, Lat#75, Long#76, 2020_01_22#77, 2020_01_23#78, 2020_01_24#79, 2020_01_25#80, 2020_01_26#81, 2020_01_27#82, 2020_01_28#83, 2020_01_29#84, 2020_01_30#85, 2020_01_31#86, 2020_02_01#87]
         +- SubqueryAlias spark_catalog.covid_db.main_table
            +- HiveTableRelation [`covid_db`.`main_table`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [Province_State#73, Country_Region#74, Lat#75, Long#76, 2020_01_22#77, 2020_01_23#78, 2020_01_24#..., Partition Cols: []]

# En athena query...

In [None]:
SELECT * FROM "AwsDataCatalog"."covid_db"."main_table" ;