# Conexion entre Pyspark y Postgres

In [1]:
import os

from pyspark.sql import SparkSession

# driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.5.2.jar" # old
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Postgres") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

# Postgres connection settings
pg_url = "jdbc:postgresql://172.18.0.2:5432/postgres"
pg_user = "postgres" # not recommended to have this value in the code
pg_password = "postgres" # not recommended to have this value in the code
pg_driver = "org.postgresql.Driver"

In [3]:
# Create the connection and read the table
df_agents = spark.read \
    .format("jdbc") \
    .option("url", pg_url) \
    .option("dbtable", "agents") \
    .option("user", pg_user) \
    .option("password", pg_password) \
    .option("driver", pg_driver) \
    .load()

df_agents.printSchema()
df_agents.show()

root
 |-- agentid: integer (nullable = true)
 |-- name: string (nullable = true)

+-------+------------------+
|agentid|              name|
+-------+------------------+
|      0|  Michele Williams|
|      1|    Jocelyn Parker|
|      2|Christopher Moreno|
|      3|       Todd Morrow|
|      4|       Randy Moore|
|      5|        Paul Nunez|
|      6|      Gloria Singh|
|      7|      Angel Briggs|
|      8|      Lisa Cordova|
|      9|        Dana Hardy|
|     10|           Agent X|
+-------+------------------+



In [4]:
# Create the connection and read the table
df_calls = spark.read \
    .format("jdbc") \
    .option("url", pg_url) \
    .option("dbtable", "calls") \
    .option("user", pg_user) \
    .option("password", pg_password) \
    .option("driver", pg_driver) \
    .load()

df_calls.printSchema()
df_calls.show()
df_calls.count()

root
 |-- callid: integer (nullable = true)
 |-- agentid: integer (nullable = true)
 |-- customerid: integer (nullable = true)
 |-- pickedup: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- productsold: integer (nullable = true)

+------+-------+----------+--------+--------+-----------+
|callid|agentid|customerid|pickedup|duration|productsold|
+------+-------+----------+--------+--------+-----------+
|     0|     10|       179|       0|       0|          0|
|     1|      5|       691|       1|     116|          0|
|     2|     10|        80|       1|     165|          0|
|     3|      6|       629|       1|     128|          0|
|     4|      8|       318|       1|     205|          0|
|     5|      7|       319|       1|     225|          1|
|     6|     10|       265|       1|     211|          0|
|     7|      9|       625|       0|       0|          0|
|     8|      5|       877|       0|       0|          0|
|     9|      5|       191|       1|     145|     

9940

In [5]:
# Create the connection and read the table
df_customers = spark.read \
    .format("jdbc") \
    .option("url", pg_url) \
    .option("dbtable", "customers") \
    .option("user", pg_user) \
    .option("password", pg_password) \
    .option("driver", pg_driver) \
    .load()

df_customers.printSchema()
df_customers.show()
df_customers.count()

root
 |-- customerid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- email: string (nullable = true)
 |-- company: string (nullable = true)
 |-- phonenumber: string (nullable = true)
 |-- age: integer (nullable = true)

+----------+------------------+--------------------+--------------------+--------------------+------------+---+
|customerid|              name|          occupation|               email|             company| phonenumber|age|
+----------+------------------+--------------------+--------------------+--------------------+------------+---+
|         0|      David Melton|          Unemployed|    DMelton@zoho.com|Morris, Winters a...|409-093-0748| 16|
|         1|  Michael Gonzalez|             Student|Gonzalez_Michael@...|  Hernandez and Sons|231-845-0673| 19|
|         2|     Amanda Wilson|             Student|Amanda.Wilson75@v...|Mooney, West and ...|844-276-4552| 18|
|         3|     Robert Thomas|Engineer, struc

1000

In [6]:
# Create temporary tables
df_agents.createOrReplaceTempView("agents")       # Agentes
df_calls.createOrReplaceTempView("calls")         # Llamadas
df_customers.createOrReplaceTempView("customers") # Clientes

## Ejercicio 1

Extraer agentes cuyo nombre empiezen por M o terminen en O

In [7]:
# SQL Select query
df_ej_1 = spark.sql("""
select * from agents
where name like 'M%' or name like '%o'
""")
df_ej_1.show()

+-------+------------------+
|agentid|              name|
+-------+------------------+
|      0|  Michele Williams|
|      2|Christopher Moreno|
+-------+------------------+



## Ejercicio 2
Escriba una consulta que produzca una lista, en orden alfabético,  de todas las distintas ocupaciones en la tabla Customer que contengan la palabra "Engineer".

In [8]:
df_ej_2 = spark.sql("""
SELECT DISTINCT Occupation
FROM customers
WHERE Occupation LIKE '%Engineer%'
ORDER BY Occupation
""")
df_ej_2.show()

+--------------------+
|          Occupation|
+--------------------+
|Engineer, aeronau...|
|Engineer, agricul...|
|Engineer, automotive|
|Engineer, biomedical|
|Engineer, broadca...|
|Engineer, buildin...|
|Engineer, civil (...|
|Engineer, civil (...|
|Engineer, communi...|
|Engineer, control...|
|  Engineer, drilling|
|Engineer, electrical|
|Engineer, electro...|
|    Engineer, energy|
|      Engineer, land|
|Engineer, mainten...|
|Engineer, mainten...|
|Engineer, manufac...|
|Engineer, manufac...|
| Engineer, materials|
+--------------------+
only showing top 20 rows



## Ejercicio 3

Escriba una consulta que devuelva el ID del cliente, su nombre y una columna  Mayor30 que contenga "Sí" si el cliente tiene más de 30 años y "No" en caso contrario.

In [9]:
df_ej_3 = spark.sql("""
SELECT CustomerID, Name,
    CASE
        WHEN Age >= 30 THEN 'Yes'
        WHEN Age <  30 THEN 'No'
        ELSE 'Missing Data'
    END AS Over30
FROM customers
ORDER BY Name DESC
""")
df_ej_3.show()

+----------+-----------------+------+
|CustomerID|             Name|Over30|
+----------+-----------------+------+
|       392|   Zachary Wilson|   Yes|
|       986|Zachary Stevenson|    No|
|       421|     Zachary Ruiz|   Yes|
|        18|     Zachary Howe|    No|
|       883| Zachary Anderson|    No|
|       952|    Yolanda White|    No|
|       715|   Yesenia Wright|    No|
|       699|    Willie Greene|   Yes|
|       860| William Thompson|    No|
|       289|    William Scott|    No|
|       866|     William Rice|    No|
|        58| William Mitchell|   Yes|
|       973|  William Jackson|   Yes|
|       126|     William Hess|   Yes|
|       966|   William Garcia|    No|
|       179|    William Davis|    No|
|       139|    William Adams|   Yes|
|       934|   Whitney Wright|    No|
|       893|   Wendy Thornton|   Yes|
|       885|    Wendy Freeman|    No|
+----------+-----------------+------+
only showing top 20 rows



## Ejercicio 4

Escriba una consulta que devuelva todas las llamadas realizadas a clientes de la  profesión de ingeniería y muestre si son mayores o menores de 30, así como si  terminaron comprando el producto de esa llamada.

In [10]:
df_ej_4 = spark.sql("""
SELECT CallID, Cu.CustomerID, Name, ProductSold,
    CASE
        WHEN Age >= 30 THEN 'Yes'
        WHEN Age <  30 THEN 'No'
        ELSE 'Missing Data'
    END AS Over30
FROM customers Cu
JOIN calls Ca ON Ca.CustomerID = Cu.CustomerID
WHERE Occupation LIKE '%Engineer%'
ORDER BY Name DESC
""")
df_ej_4.show()

+------+----------+---------------+-----------+------+
|CallID|CustomerID|           Name|ProductSold|Over30|
+------+----------+---------------+-----------+------+
|  1552|       699|  Willie Greene|          1|   Yes|
|  9844|       699|  Willie Greene|          0|   Yes|
|  1817|       699|  Willie Greene|          0|   Yes|
|  3688|       699|  Willie Greene|          0|   Yes|
|  4150|       699|  Willie Greene|          0|   Yes|
|  4900|       699|  Willie Greene|          1|   Yes|
|  5305|       699|  Willie Greene|          1|   Yes|
|  6191|       699|  Willie Greene|          0|   Yes|
|  6270|       699|  Willie Greene|          0|   Yes|
|  9705|       699|  Willie Greene|          0|   Yes|
|  9714|       699|  Willie Greene|          0|   Yes|
|  3113|       973|William Jackson|          0|   Yes|
|  8940|       973|William Jackson|          0|   Yes|
|  3162|       973|William Jackson|          0|   Yes|
|  9531|       973|William Jackson|          0|   Yes|
|  3780|  

## Ejercicio 5

Escriba dos consultas: una que calcule las ventas totales y las llamadas totales realizadas a los clientes de la profesión de ingeniería y otra que calcule las mismas métricas para toda la base de clientes

In [11]:
df_ej_5 = spark.sql("""
SELECT SUM(ProductSold) AS TotalSales, COUNT(*) AS NCalls
FROM customers Cu
JOIN calls Ca ON Ca.CustomerID = Cu.CustomerID
WHERE Occupation LIKE '%Engineer%'
""")
df_ej_5.show()

+----------+------+
|TotalSales|NCalls|
+----------+------+
|       502|  2483|
+----------+------+



## Ejercicio 6

Escriba una consulta que devuelva, para cada agente, el nombre del agente, la cantidad de llamadas, las llamadas más largas y más cortas, la duración promedio de las llamadas y la cantidad total de productos vendidos. Nombra las columnas AgentName, NCalls, Shortest, Longest, AvgDuration y TotalSales.

Luego ordena la tabla por AgentName en orden alfabético.  (Asegúrese de incluir la cláusula WHERE PickedUp = 1 para calcular solo el promedio de todas las llamadas que fueron atendidas (de lo contrario, ¡todas las duraciones mínimas serán 0)!)

In [12]:
df_ej_6 = spark.sql("""
SELECT Name AS AgentName, COUNT(*) AS NCalls, MIN(Duration) AS Shortest, MAX(Duration) AS Longest, ROUND(AVG(Duration),2) AS AvgDuration, SUM(ProductSold) AS TotalSales
FROM calls C
    JOIN agents A ON C.AgentID = A.AgentID
WHERE PickeDup = 1
GROUP BY Name
ORDER BY Name
""")
df_ej_6.show()

+------------------+------+--------+-------+-----------+----------+
|         AgentName|NCalls|Shortest|Longest|AvgDuration|TotalSales|
+------------------+------+--------+-------+-----------+----------+
|           Agent X|   640|      22|    334|     180.98|       194|
|      Angel Briggs|   591|      12|    362|     181.08|       157|
|Christopher Moreno|   649|      47|    363|     177.98|       189|
|        Dana Hardy|   554|      49|    356|      177.2|       182|
|      Gloria Singh|   662|      36|    349|     182.18|       209|
|    Jocelyn Parker|   621|      40|    336|     180.33|       184|
|      Lisa Cordova|   639|      46|    344|     179.21|       201|
|  Michele Williams|   685|      22|    306|     177.88|       198|
|        Paul Nunez|   648|      -5|    323|     181.07|       194|
|       Randy Moore|   600|      16|    326|      178.6|       177|
|       Todd Morrow|   631|      -3|    339|     180.71|       204|
+------------------+------+--------+-------+----

## Ejercicio 7

Dos métricas del desempeño de los agentes de ventas que le interesan a su empresa son: 
1. para cada agente, cuántos segundos en promedio les toma vender un producto cuando tienen éxito
2. para cada agente, cuántos segundos en promedio permanecen en el teléfono antes de darse por vencidos cuando no tienen éxito. Escribe una consulta que calcule esto

In [13]:
df_ej_7 = spark.sql("""
SELECT a.name,
SUM(
   CASE
       WHEN productsold = 0 THEN duration
       ELSE 0
   END)/SUM(
   CASE
       WHEN productsold = 0 THEN 1
       ELSE 0
   END)
AS avgWhenNotSold ,
SUM(
   CASE
       WHEN productsold = 1 THEN duration
       ELSE 0
   END)/SUM(
       CASE WHEN productsold = 1 THEN 1
       ELSE 0
   END)
AS avgWhenSold
FROM calls c
JOIN agents a ON c.agentid = a.agentid
GROUP BY a.name
ORDER BY 1
""")
df_ej_7.show()

+------------------+------------------+------------------+
|              name|    avgWhenNotSold|       avgWhenSold|
+------------------+------------------+------------------+
|           Agent X|109.81705639614856|             185.5|
|      Angel Briggs|108.66160220994475|180.56050955414014|
|Christopher Moreno| 112.4882108183079|182.03703703703704|
|        Dana Hardy| 97.73082706766917|182.30769230769232|
|      Gloria Singh|115.41143654114366|181.10047846889952|
|    Jocelyn Parker|118.99848484848485| 181.7608695652174|
|      Lisa Cordova|110.09470752089136|176.46766169154228|
|  Michele Williams|111.77763496143959|176.18686868686868|
|        Paul Nunez|113.24380165289256| 181.0257731958763|
|       Randy Moore|107.28611898016997|177.47457627118644|
|       Todd Morrow|109.15677966101696|180.12745098039215|
+------------------+------------------+------------------+

