In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## Leer los datos

In [2]:
df = sqlContext.read.format("csv").option("delimiter", "\t").option("header", "true").load("cubo_ventas/fact_item.txt")

In [3]:
df_clientes = sqlContext.read.format("csv").option("delimiter", "\t").option("header", "true").load("cubo_ventas/dim_customer.txt")

In [4]:
df.head()

Row(dwh_country_id='276', fk_sales_order_item='88056', fk_sales_order='5642', local_unit_price='1.29', unit_price_eur='1.29', local_tax_amount='0.08', tax_amount_eur='0.08', local_paid_price='1.29', paid_price_eur='1.29', local_paid_price_net='1.21', paid_price_net_eur='1.21', local_coupon_money_value='0', coupon_money_value_eur='0', local_coupon_money_value_net='0', coupon_money_value_eur_net='0', local_original_price='1.29', original_unit_price_eur='0.89', fk_dim_catalog='1676', fk_dim_date='9/28/2015 12:00:00 AM', fk_dim_customer='194', fk_dim_order='6142', order_bi_created_at='9/28/2015 8:33:17 AM', order_bi_updated_at='1/26/2016 3:55:59 PM', order_item_bi_created_at='9/28/2015 8:33:17 AM', order_item_bi_updated_at='9/28/2015 8:34:04 AM')

In [5]:
df.printSchema()

root
 |-- dwh_country_id: string (nullable = true)
 |-- fk_sales_order_item: string (nullable = true)
 |-- fk_sales_order: string (nullable = true)
 |-- local_unit_price: string (nullable = true)
 |-- unit_price_eur: string (nullable = true)
 |-- local_tax_amount: string (nullable = true)
 |-- tax_amount_eur: string (nullable = true)
 |-- local_paid_price: string (nullable = true)
 |-- paid_price_eur: string (nullable = true)
 |-- local_paid_price_net: string (nullable = true)
 |-- paid_price_net_eur: string (nullable = true)
 |-- local_coupon_money_value: string (nullable = true)
 |-- coupon_money_value_eur: string (nullable = true)
 |-- local_coupon_money_value_net: string (nullable = true)
 |-- coupon_money_value_eur_net: string (nullable = true)
 |-- local_original_price: string (nullable = true)
 |-- original_unit_price_eur: string (nullable = true)
 |-- fk_dim_catalog: string (nullable = true)
 |-- fk_dim_date: string (nullable = true)
 |-- fk_dim_customer: string (nullable = tru

In [7]:
df.describe('dwh_country_id').show()

+-------+--------------+
|summary|dwh_country_id|
+-------+--------------+
|  count|        175909|
|   mean|         276.0|
| stddev|           0.0|
|    min|           276|
|    max|           276|
+-------+--------------+



## SQL API

In [8]:
df.createOrReplaceTempView("fact_item")
df_clientes.createOrReplaceTempView("dim_customer")
users = sqlContext.sql("SELECT channel,fk_sales_order_item \
                       FROM fact_item join dim_customer on fk_dim_customer=id_dim_customer limit 10")
users.show()

+-----------------+-------------------+
|          channel|fk_sales_order_item|
+-----------------+-------------------+
|Social Media Paid|              88056|
|Social Media Paid|              88055|
|Social Media Paid|              88054|
|Social Media Paid|              88053|
|Social Media Paid|              88052|
|Social Media Paid|              88051|
|Social Media Paid|              91950|
|Social Media Paid|              91949|
|Social Media Paid|              91948|
|Social Media Paid|              91947|
+-----------------+-------------------+



### 1. Perfilamiento de datos

Realice un perfilamiento de datos para 4 columnas de fact_item que hagan referencia a una métrica. 

### 2. Ejercicio SQL:

a.Ventas (fact_item.paid_price_total_eur) por canal de mercadeo (dim_custome.channel)


In [16]:
df.createOrReplaceTempView("fact_item")
df_clientes.createOrReplaceTempView("dim_customer")
users = sqlContext.sql("SELECT sum(paid_price_total_eur) suma, channel \
                       FROM fact_item join dim_customer on fk_dim_customer=id_dim_customer  \
                       GROUP BY channel limit 15")
users.show()

+------------------+--------------------+
|              suma|             channel|
+------------------+--------------------+
| 3703966.639999915|                 SEO|
| 978618.4399999831|       SEM Non Brand|
|1916225.2000000416|                null|
| 299048.4000000016|             Display|
| 292294.3899999953|               Email|
| 4100295.429999952|              Direct|
|1825386.8799999857|            Referral|
| 93198.88999999981|         Retargeting|
|2265328.2500000363|   Social Media Paid|
|2096564.8900000483|           SEM Brand|
| 83203.26000000064|Social Media Reta...|
|427908.69000000384| Social Media Unpaid|
|232315.44999999757|        Other Unpaid|
+------------------+--------------------+



b.Número de clientes por método de pago (dim_order.payment_method)

In [24]:
d_order = sqlContext.read.format("csv").option("delimiter", "\t").option("header", "true").load("cubo_ventas/dim_order.txt")
d_order.createOrReplaceTempView("dim_order")
users = sqlContext.sql("SELECT count(distinct fk_dim_customer) cantidad, payment_method metodo \
                       FROM fact_item join dim_order on fk_dim_order=id_dim_order \
                       GROUP BY payment_method ")
users.show()

+-------------------------------+--------------------+
|count(DISTINCT fk_dim_customer)|      payment_method|
+-------------------------------+--------------------+
|                            169|           NoPayment|
|                            559|                 COD|
|                           1727|    Adyen_CreditCard|
|                            853|   Adyen_DirectDebit|
|                              7|Adyen_PaypalRecur...|
|                           2407|        Adyen_Paypal|
+-------------------------------+--------------------+



c.Ranking de productores (dim_catalog.producer) por ventas (fact_item.paid_price_net_eur)

In [31]:
d_catalog = sqlContext.read.format("csv").option("delimiter", "\t").option("header", "true").load("cubo_ventas/dim_catalog.txt")
d_catalog.createOrReplaceTempView("dim_catalog")
users = sqlContext.sql("SELECT SUM(paid_price_net_eur) total, producer \
                       FROM fact_item join dim_catalog on fk_dim_catalog=id_catalog \
                       GROUP BY producer \
                       ORDER BY SUM(paid_price_net_eur) DESC")
users.show()


+------------------+--------------------+
|             total|            producer|
+------------------+--------------------+
|137238.74000000022|                null|
|  33570.7799999998|Premium Frischeha...|
| 21160.70000000003|            Bonativo|
|19717.780000000203|   Bio Frischehandel|
|14257.149999999983|Fleischerei Erchi...|
|13991.819999999945|Landfleischerei R...|
|12720.740000000049|     Blomeyer's Käse|
| 7874.530000000034|     Ökohof Kuhhorst|
| 7648.189999999935|         BioBackHaus|
|7338.1100000000315|  Ökohof Lemke&Kluge|
|  6049.91000000008|    Ökodorf Brodowin|
| 5949.770000000017|   Die Müritzfischer|
| 5345.660000000105|         Hemme Milch|
| 4811.189999999996|Gemüsebaubetrieb ...|
| 4508.089999999997|  Fleischerei Bünger|
| 4130.770000000015|       Bauer Nietsch|
| 4062.100000000058|   Gläserne Molkerei|
|3758.6999999999994|Bauernkäserei Wol...|
|3620.9300000000135|Ökohof Teltower R...|
|3007.5299999999993|Berliner Lachsman...|
+------------------+--------------

d. Ranking de clientes por ventas (fact_item.paid_price_total_eur

In [34]:
users = sqlContext.sql("SELECT SUM(paid_price_net_eur) total, fk_dim_customer idcliente \
                       FROM fact_item  \
                       GROUP BY fk_dim_customer \
                       ORDER BY SUM(paid_price_net_eur) DESC")
users.show()

+------------------+---------+
|             total|idcliente|
+------------------+---------+
| 4053.400000000003|     4477|
|3820.5499999999915|     1265|
| 3699.800000000018|     5416|
| 3081.760000000001|     3654|
| 3011.999999999993|     2750|
|2855.6599999999867|     5144|
|2698.2199999999916|     3806|
|2362.8800000000065|      535|
|2090.4400000000105|      586|
|2034.4099999999987|     5064|
|  1914.04999999998|     3407|
|1889.9899999999936|     2751|
|1827.1699999999946|     1175|
|1813.3299999999913|     3918|
|1662.9399999999964|     1964|
|1631.5300000000007|     3815|
|1574.3899999999946|     5776|
|1563.4600000000055|     2196|
|1523.5600000000004|     3445|
|1498.3500000000042|      946|
+------------------+---------+
only showing top 20 rows



e. Número de órdenes con más de 3 items y con valor pagado por item mayor a 30 euros (fact_item.paid_price_total_eur)

In [66]:
users = sqlContext.sql("SELECT count(*) total,FK_DIM_ORDER \
                       FROM fact_item \
                       WHERE paid_price_net_eur>30 \
                       GROUP BY FK_DIM_ORDER \
                       HAVING total> 1")
users.show(15)
users.count()

+-----+------------+
|total|FK_DIM_ORDER|
+-----+------------+
|    2|         718|
|    3|        5965|
|    2|        5863|
|    3|        6458|
|    2|        2247|
|    2|        2253|
|    2|        6919|
|    2|        5967|
|    2|        2251|
|    2|        2250|
|    4|        6996|
|    2|        2268|
|    2|        5959|
|    3|        2267|
|    2|        5861|
+-----+------------+
only showing top 15 rows



41

### Ejercicios PySpark

Elija dos de las preguntas del punto anterior y respóndalas utilizando las funciones de PySpark
http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html
https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/
