# ETL
## with PySpark

In [57]:
from pyspark.sql import SparkSession

In [58]:
spark = SparkSession.builder.appName("ETLtest").\
    config("spark.jars", "postgresql-42.7.3.jar").\
        getOrCreate()

In [59]:
spark.sparkContext.setLogLevel("ERROR")


In [60]:
df = spark.read.csv("/mnt/d/Proyectos/Tutorial-SparkAWS/etl_data/WordData.txt")
df.show(3)

+--------------------+
|                 _c0|
+--------------------+
|This is a Japanes...|
|The team members ...|
|As the years pass...|
+--------------------+
only showing top 3 rows



## 2. Transformacion de datos

In [28]:
from pyspark.sql.functions import col
import pyspark.sql.functions as f

In [29]:
df = df.withColumn("splitedData", f.split(col("_c0"), " "))
df = df.withColumn("splitedDataExplode", f.explode(col("splitedData")))
df.show(4)
                   

+--------------------+--------------------+------------------+
|                 _c0|         splitedData|splitedDataExplode|
+--------------------+--------------------+------------------+
|This is a Japanes...|[This, is, a, Jap...|              This|
|This is a Japanes...|[This, is, a, Jap...|                is|
|This is a Japanes...|[This, is, a, Jap...|                 a|
|This is a Japanes...|[This, is, a, Jap...|          Japanese|
+--------------------+--------------------+------------------+
only showing top 4 rows



In [46]:
df_group = df.groupBy("splitedDataExplode").count()
df_group.show(4)

+------------------+-----+
|splitedDataExplode|count|
+------------------+-----+
|          Tomorrow|    4|
|                If|    8|
|             leave|    4|
|             corny|    4|
+------------------+-----+
only showing top 4 rows



## 3. Load - Conexion con BD

In [31]:
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/ny_taxy"
user = "root"
password = "root"
db = "ny_taxy"
tables = "mi_spark.tabla_aux"


In [53]:
df_group.write.format("jdbc").option("driver", driver).\
    option("url", url).\
        option("dbtable", tables).\
            mode("append").\
                option("user", user).\
                    option("password", password).\
                        save()

## 4. Leemos de una BD

In [54]:
rd_df = spark.read.format("jdbc").option("driver", driver).\
    option("user", user).\
        option("password", password).\
            option("url", url).\
                option("dbtable", "mi_spark.tabla_aux").load()

In [56]:
rd_df.filter(col("splitedDataExplode")=="Tomorrow").groupby(col("splitedDataExplode")).sum("count").show()

+------------------+----------+
|splitedDataExplode|sum(count)|
+------------------+----------+
|          Tomorrow|        16|
+------------------+----------+



## 5. Load - Conexión AWS

In [72]:
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://mipruebapyspark.cdqe2q4c2zqa.sa-east-1.rds.amazonaws.com/"
user = "postgres"
password = ""
db = "ny_taxy"
tables = "my_conexion_spark.tabla_aws"

In [67]:
df_group.write.format("jdbc").option("driver", driver).\
    option("url", url).\
        option("dbtable", tables).\
            mode("append").\
                option("user", user).\
                    option("password", password).\
                        save()

## 6. Leemos de AWS 

In [69]:
rd_df = spark.read.format("jdbc").option("driver", driver).\
    option("user", user).\
        option("password", password).\
            option("url", url).\
                option("dbtable", "my_conexion_spark.tabla_aws").load()

In [71]:
rd_df.filter(col("splitedDataExplode")=="Tomorrow").\
    groupby(col("splitedDataExplode")).\
        sum("count").\
            show()

+------------------+----------+
|splitedDataExplode|sum(count)|
+------------------+----------+
|          Tomorrow|         4|
+------------------+----------+

