# **PySpark**: The Apache Spark Python API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API.

## 2. The Spark Cluster

### 2.1. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PysparkExample").config ("spark.sql.shuffle.partitions", "50").config("spark.driver.maxResultSize","5g").config ("spark.sql.execution.arrow.enabled", "true").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/16 11:53:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

In [4]:
venta = spark.read.option("compression.codec", "snappy").option("mergeSchema", "true").parquet("/home/pc/Documentos/henry/hadoopProyecto/Datasets2/venta/000000_0")

                                                                                

In [5]:
venta.count()

                                                                                

46645

In [8]:
len(venta.columns)

10

In [9]:
venta.printSchema()

root
 |-- idventa: integer (nullable = true)
 |-- fecha: date (nullable = true)
 |-- fecha_entrega: date (nullable = true)
 |-- idcanal: integer (nullable = true)
 |-- idcliente: integer (nullable = true)
 |-- idsucursal: integer (nullable = true)
 |-- idempleado: integer (nullable = true)
 |-- idproducto: integer (nullable = true)
 |-- precio: float (nullable = true)
 |-- cantidad: integer (nullable = true)



In [10]:
venta.show(n=10)

+-------+----------+-------------+-------+---------+----------+----------+----------+------+--------+
|idventa|     fecha|fecha_entrega|idcanal|idcliente|idsucursal|idempleado|idproducto|precio|cantidad|
+-------+----------+-------------+-------+---------+----------+----------+----------+------+--------+
|      1|2018-03-09|   2018-03-17|      3|      969|        13|      1674|     42817|813.12|       2|
|      2|2018-12-28|   2018-12-29|      2|      884|        13|      1674|     42795|543.18|       3|
|      3|2016-03-28|   2016-03-31|      2|     1722|        13|      1674|     42837|430.32|       1|
|      4|2017-10-23|   2017-10-24|      3|     2876|        13|      1674|     42834|818.84|       2|
|      5|2017-11-22|   2017-11-25|      2|      678|        13|      1674|     42825|554.18|       3|
|      6|2018-01-24|   2018-01-25|      2|     3263|        13|      1674|     42852| 152.0|       1|
|      7|2015-03-25|   2015-03-26|      3|     2983|        13|      1674|     429

In [21]:
from pyspark.sql.functions import mean, stddev  
from pyspark.sql.functions import col

In [14]:
# calculate statistics
ventas_out = venta.na.drop(subset=['precio','cantidad']).groupBy("idproducto").agg(mean(venta.precio).alias("promedio"), stddev(venta.precio).alias("stddev"))

In [15]:
ventas_out = ventas_out.withColumn("PrecioMaximo", ventas_out.promedio + ventas_out.stddev * 3).withColumn("PrecioMinimo", ventas_out.promedio - ventas_out.stddev * 3)

In [16]:
ventas_out.show(n=10)

+----------+------------------+------------------+------------------+-------------------+
|idproducto|          promedio|            stddev|      PrecioMaximo|       PrecioMinimo|
+----------+------------------+------------------+------------------+-------------------+
|     42834|1772.5477912454044| 8766.698479843624|28072.643230776277| -24527.54764828547|
|     42991|1656.0337078651685|7184.8999426086675| 23210.73353569117|-19898.666119960835|
|     42861|2431.3661971830984| 9924.897685292788|32206.059253061463|-27343.326858695265|
|     43040|             757.0|               0.0|             757.0|              757.0|
|     43010| 579.5276381909548|2715.9365607484365| 8727.337320436265| -7568.282044054355|
|     43022|             675.2| 3252.412888918011|10432.438666754035| -9082.038666754033|
|     43026|1884.0645161290322| 8346.479364859078|26923.502610706266|-23155.373578448205|
|     42822|2014.9213189019097| 9806.281581842579|31433.766064429645|-27403.923426625825|
|     4301

In [17]:
ventas_out.write.option("compression.codec", "snappy").option("mergeSchema", "true").parquet("data2/venta_criterio_outliers", mode="overwrite")

                                                                                

In [18]:
venta = venta.alias("v").join(ventas_out.alias("o"), venta['idproducto'] == ventas_out['idproducto']).select("v.idventa","v.fecha","v.fecha_entrega","v.idcanal","v.idcliente","v.idsucursal","v.idempleado","v.idproducto","v.precio","v.cantidad","o.promedio","o.stddev","o.PrecioMaximo","o.PrecioMinimo")

In [19]:
venta.printSchema()

root
 |-- idventa: integer (nullable = true)
 |-- fecha: date (nullable = true)
 |-- fecha_entrega: date (nullable = true)
 |-- idcanal: integer (nullable = true)
 |-- idcliente: integer (nullable = true)
 |-- idsucursal: integer (nullable = true)
 |-- idempleado: integer (nullable = true)
 |-- idproducto: integer (nullable = true)
 |-- precio: float (nullable = true)
 |-- cantidad: integer (nullable = true)
 |-- promedio: double (nullable = true)
 |-- stddev: double (nullable = true)
 |-- PrecioMaximo: double (nullable = true)
 |-- PrecioMinimo: double (nullable = true)



In [20]:
venta.withColumn("PrecioMaximo",col("PrecioMaximo").cast("float"))

NameError: name 'col' is not defined

In [126]:
venta.withColumn("PrecioMinimo",col("PrecioMinimo").cast("float"))

DataFrame[idventa: int, fecha: date, fecha_entrega: date, idcanal: int, idcliente: int, idsucursal: int, idempleado: int, idproducto: int, precio: float, cantidad: int, promedio: double, stddev: double, PrecioMaximo: double, PrecioMinimo: float]

In [127]:
def detecta_outlier(valor, maximo, minimo):
    return (valor < minimo) or (valor > maximo)

In [128]:
udf_detecta_outlier = udf(lambda valor, MaxLimit, MinLimit: detecta_outlier(valor, MaxLimit, MinLimit), BooleanType())

In [129]:
venta.count()

                                                                                

46645

In [130]:
venta = venta.na.drop(subset=['precio','cantidad'])

In [131]:
venta.count()

                                                                                

44845

In [132]:
venta = venta.withColumn("esOutlier", udf_detecta_outlier(venta.precio, venta.PrecioMaximo, venta.PrecioMinimo)).filter("NOT esOutlier")

In [133]:
venta.show(n=10)

                                                                                

+-------+----------+-------------+-------+---------+----------+----------+----------+------+--------+------------------+------------------+------------------+-------------------+---------+
|idventa|     fecha|fecha_entrega|idcanal|idcliente|idsucursal|idempleado|idproducto|precio|cantidad|          promedio|            stddev|      PrecioMaximo|       PrecioMinimo|esOutlier|
+-------+----------+-------------+-------+---------+----------+----------+----------+------+--------+------------------+------------------+------------------+-------------------+---------+
|      1|2018-03-09|   2018-03-17|      3|      969|        13|      1674|     42817|813.12|       2|2294.6944737346626|10853.233513624373|34854.395014607784|-30265.006067138456|    false|
|      2|2018-12-28|   2018-12-29|      2|      884|        13|      1674|     42795|543.18|       3| 2491.543036419412|10085.326471187416| 32747.52244998166| -27764.43637714284|    false|
|      3|2016-03-28|   2016-03-31|      2|     1722|   

In [134]:
venta = venta.select(["idventa","fecha","fecha_entrega","idcanal","idcliente","idsucursal","idempleado","idproducto","precio","cantidad"])

In [135]:
venta.write.option("compression.codec", "snappy").option("mergeSchema", "true").parquet("data2/venta_sin_outliers", mode="overwrite")

                                                                                