In [1]:
import os 
os.environ['SPARK_HOME']=r'C:/spark/'
os.environ['HADOOP_HOME'] = r'C:/hadoop/'
os.environ['PYSPARK_DRIVER_PYTHON']='jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']='lab'
os.environ['PYSPARK_PYTHON']='python'

In [2]:
from pyspark.sql import SparkSession
# Crear Sesion spark
spark = SparkSession.builder.appName("App1").getOrCreate()
# Leer la data
customers = spark.read.csv("customers.csv", header=True, inferSchema=True)
agents = spark.read.csv("agents.csv", header=True, inferSchema=True)
calls = spark.read.csv("calls.csv", header=True, inferSchema=True)
# Mostremos la data
customers.show(5)

+----------+----------------+--------------------+--------------------+--------------------+------------+---+
|customerid|            name|          occupation|               email|             company| phonenumber|Age|
+----------+----------------+--------------------+--------------------+--------------------+------------+---+
|         0|    David Melton|          Unemployed|    DMelton@zoho.com|Morris, Winters a...|409-093-0748| 16|
|         1|Michael Gonzalez|             Student|Gonzalez_Michael@...|  Hernandez and Sons|231-845-0673| 19|
|         2|   Amanda Wilson|             Student|Amanda.Wilson75@v...|Mooney, West and ...|844-276-4552| 18|
|         3|   Robert Thomas|Engineer, structural| RThomas@xfinity.com|      Johnson-Gordon|410-404-8000| 25|
|         4|      Eddie Hall|             Surgeon|EddieHall@outlook...|          Dawson LLC|872-287-2196| 30|
+----------+----------------+--------------------+--------------------+--------------------+------------+---+
only showi

# Ejemplo consulta OLTP

In [4]:
insert_data = [
    (10000, 4, 6, 1, 130, 1),
    (10001, 5, 7, 1, 131, 0),
    (10002, 10, 260, 0, 0, 0),
    (10003, 3, 5, 1, 60, 1),
    (10004, 10, 731, 1, 90, 0),
    (10005, 4, 415, 0, 0, 0)
]
schema=calls.columns
# Dataframe a insertar
insert_df = spark.createDataFrame(insert_data, schema=schema)
insert_df.show()

+------+-------+----------+--------+--------+-----------+
|callid|agentid|customerid|pickedup|duration|productsold|
+------+-------+----------+--------+--------+-----------+
| 10000|      4|         6|       1|     130|          1|
| 10001|      5|         7|       1|     131|          0|
| 10002|     10|       260|       0|       0|          0|
| 10003|      3|         5|       1|      60|          1|
| 10004|     10|       731|       1|      90|          0|
| 10005|      4|       415|       0|       0|          0|
+------+-------+----------+--------+--------+-----------+



In [5]:
calls_final = calls.union(insert_df)
calls_final.show(n=10)

+------+-------+----------+--------+--------+-----------+
|callid|agentid|customerid|pickedup|duration|productsold|
+------+-------+----------+--------+--------+-----------+
|     0|     10|       179|       0|       0|          0|
|     1|      5|       691|       1|     116|          0|
|     2|     10|        80|       1|     165|          0|
|     3|      6|       629|       1|     128|          0|
|     4|      8|       318|       1|     205|          0|
|     5|      7|       319|       1|     225|          1|
|     6|     10|       265|       1|     211|          0|
|     7|      9|       625|       0|       0|          0|
|     8|      5|       877|       0|       0|          0|
|     9|      5|       191|       1|     145|          0|
+------+-------+----------+--------+--------+-----------+
only showing top 10 rows



In [7]:
calls_final.tail(5)

[Row(callid=10001, agentid=5, customerid=7, pickedup=1, duration=131, productsold=0),
 Row(callid=10002, agentid=10, customerid=260, pickedup=0, duration=0, productsold=0),
 Row(callid=10003, agentid=3, customerid=5, pickedup=1, duration=60, productsold=1),
 Row(callid=10004, agentid=10, customerid=731, pickedup=1, duration=90, productsold=0),
 Row(callid=10005, agentid=4, customerid=415, pickedup=0, duration=0, productsold=0)]

# Ejercicio OLAP

In [8]:
# Poner los Spark Dataframe en tablas de SQL temporarias
calls.createOrReplaceTempView("calls")
agents.createOrReplaceTempView("agents")
customers.createOrReplaceTempView("customers")

# SQL para usar Spark SQL
result = spark.sql("""
    SELECT a.name AS AgentName, cu.name AS CustomerName, x.duration
    FROM (
        SELECT ca.agentid, ca.duration, max(customerid) AS cid
        FROM (
            SELECT agentid, min(duration) as fastestcall
            FROM calls
            WHERE productsold = 1
            GROUP BY agentid
        ) min
        JOIN calls ca ON ca.agentid = min.agentid AND ca.duration = min.fastestcall
        WHERE productsold = 1
        GROUP BY ca.agentid, ca.duration
    ) x
    JOIN agents a ON x.agentid = a.agentid
    JOIN customers cu ON cu.customerid = x.cid
""")

# Mostrar el resultado
result.show()

+------------------+---------------+--------+
|         AgentName|   CustomerName|duration|
+------------------+---------------+--------+
|       Randy Moore|Alexandra Allen|      49|
|      Lisa Cordova| Carlos Bennett|      57|
|      Angel Briggs|Brandy Ferguson|      47|
|        Dana Hardy|Erin Mccullough|      51|
|        Paul Nunez| Matthew Martin|      44|
|       Todd Morrow|  Shari Barnett|      37|
|      Gloria Singh|     Jenny Dean|      36|
|           Agent X|  Daniel Hughes|      22|
|Christopher Moreno|    John George|      55|
|  Michele Williams|Matthew Schultz|      62|
|    Jocelyn Parker|   William Rice|      72|
+------------------+---------------+--------+



In [14]:
from pyspark.sql.functions import col
result.orderBy('duration',ascending=False).show()

+------------------+---------------+--------+
|         AgentName|   CustomerName|duration|
+------------------+---------------+--------+
|    Jocelyn Parker|   William Rice|      72|
|  Michele Williams|Matthew Schultz|      62|
|      Lisa Cordova| Carlos Bennett|      57|
|Christopher Moreno|    John George|      55|
|        Dana Hardy|Erin Mccullough|      51|
|       Randy Moore|Alexandra Allen|      49|
|      Angel Briggs|Brandy Ferguson|      47|
|        Paul Nunez| Matthew Martin|      44|
|       Todd Morrow|  Shari Barnett|      37|
|      Gloria Singh|     Jenny Dean|      36|
|           Agent X|  Daniel Hughes|      22|
+------------------+---------------+--------+

