In [2]:
import pyspark
sc = pyspark.SparkContext('local[*]')

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## Leer los datos

In [3]:
df = sqlContext.read.format("csv").option("delimiter", "\t").option("header", "true").load("cubo_ventas/fact_item.txt")

In [4]:
df_clientes = sqlContext.read.format("csv").option("delimiter", "\t").option("header", "true").load("cubo_ventas/dim_customer.txt")

In [15]:
df_order = sqlContext.read.format("csv").option("delimiter", "\t").option("header", "true").load("cubo_ventas/dim_order.txt")

In [20]:
df_catalog = sqlContext.read.format("csv").option("delimiter", "\t").option("header", "true").load("cubo_ventas/dim_catalog.txt")

In [5]:
df.head()

Row(dwh_country_id='276', fk_sales_order_item='88056', fk_sales_order='5642', local_unit_price='1.29', unit_price_eur='1.29', local_tax_amount='0.08', tax_amount_eur='0.08', local_paid_price='1.29', paid_price_eur='1.29', local_paid_price_net='1.21', paid_price_net_eur='1.21', local_coupon_money_value='0', coupon_money_value_eur='0', local_coupon_money_value_net='0', coupon_money_value_eur_net='0', local_original_price='1.29', original_unit_price_eur='0.89', fk_dim_catalog='1676', fk_dim_date='9/28/2015 12:00:00 AM', fk_dim_customer='194', fk_dim_order='6142', order_bi_created_at='9/28/2015 8:33:17 AM', order_bi_updated_at='1/26/2016 3:55:59 PM', order_item_bi_created_at='9/28/2015 8:33:17 AM', order_item_bi_updated_at='9/28/2015 8:34:04 AM')

In [9]:
df.printSchema()

root
 |-- dwh_country_id: string (nullable = true)
 |-- fk_sales_order_item: string (nullable = true)
 |-- fk_sales_order: string (nullable = true)
 |-- local_unit_price: string (nullable = true)
 |-- unit_price_eur: string (nullable = true)
 |-- local_tax_amount: string (nullable = true)
 |-- tax_amount_eur: string (nullable = true)
 |-- local_paid_price: string (nullable = true)
 |-- paid_price_eur: string (nullable = true)
 |-- local_paid_price_net: string (nullable = true)
 |-- paid_price_net_eur: string (nullable = true)
 |-- local_coupon_money_value: string (nullable = true)
 |-- coupon_money_value_eur: string (nullable = true)
 |-- local_coupon_money_value_net: string (nullable = true)
 |-- coupon_money_value_eur_net: string (nullable = true)
 |-- local_original_price: string (nullable = true)
 |-- original_unit_price_eur: string (nullable = true)
 |-- fk_dim_catalog: string (nullable = true)
 |-- fk_dim_date: string (nullable = true)
 |-- fk_dim_customer: string (nullable = tru

In [16]:
df.describe('dwh_country_id').show()

+-------+--------------+
|summary|dwh_country_id|
+-------+--------------+
|  count|        175909|
|   mean|         276.0|
| stddev|           0.0|
|    min|           276|
|    max|           276|
+-------+--------------+



## SQL API

In [15]:
df.createOrReplaceTempView("fact_item")
df_clientes.createOrReplaceTempView("dim_customer")
users = sqlContext.sql("SELECT channel,fk_sales_order_item \
                       FROM fact_item join dim_customer on fk_dim_customer=id_dim_customer limit 10")
users.show()

+-----------------+-------------------+
|          channel|fk_sales_order_item|
+-----------------+-------------------+
|Social Media Paid|              88056|
|Social Media Paid|              88055|
|Social Media Paid|              88054|
|Social Media Paid|              88053|
|Social Media Paid|              88052|
|Social Media Paid|              88051|
|Social Media Paid|              91950|
|Social Media Paid|              91949|
|Social Media Paid|              91948|
|Social Media Paid|              91947|
+-----------------+-------------------+



### 1. Perfilamiento de datos

Realice un perfilamiento de datos para 4 columnas de fact_item que hagan referencia a una métrica. 

### 2. Ejercicio SQL:

a.Ventas (fact_item.paid_price_total_eur) por canal de mercadeo (dim_custome.channel)


In [14]:
df.createOrReplaceTempView("fact_item")
df_clientes.createOrReplaceTempView("dim_customer")
users = sqlContext.sql("SELECT channel, SUM(paid_price_total_eur() \
                       FROM fact_item join dim_customer on fk_dim_customer=id_dim_customer GROUP BY channel")
users.show()

+--------------------+-----------------------------------------+
|             channel|sum(CAST(paid_price_total_eur AS DOUBLE))|
+--------------------+-----------------------------------------+
|                 SEO|                        3703966.639999915|
|       SEM Non Brand|                        978618.4399999831|
|                null|                       1916225.2000000416|
|             Display|                        299048.4000000016|
|               Email|                        292294.3899999953|
|              Direct|                        4100295.429999952|
|            Referral|                       1825386.8799999857|
|         Retargeting|                        93198.88999999981|
|   Social Media Paid|                       2265328.2500000363|
|           SEM Brand|                       2096564.8900000483|
|Social Media Reta...|                        83203.26000000064|
| Social Media Unpaid|                       427908.69000000384|
|        Other Unpaid|   

b.Número de clientes por método de pago (dim_order.payment_method)

In [36]:
df_order.createOrReplaceTempView("dim_order")
df.createOrReplaceTempView("fact_item")
users = sqlContext.sql("SELECT COUNT(DISTINCT fk_dim_order) as numeroClientes, payment_method \
                       FROM fact_item join dim_order on fk_dim_order=id_dim_order GROUP BY payment_method")
users.show()

+--------------+--------------------+
|numeroClientes|      payment_method|
+--------------+--------------------+
|           275|           NoPayment|
|           833|                 COD|
|          1463|   Adyen_DirectDebit|
|          4102|    Adyen_CreditCard|
|             7|Adyen_PaypalRecur...|
|          5100|        Adyen_Paypal|
+--------------+--------------------+



c.Ranking de productores (dim_catalog.producer) por ventas (fact_item.paid_price_total_eur)

In [49]:
df.createOrReplaceTempView("fact_item")
df_catalog.createOrReplaceTempView("dim_catalog")
users = sqlContext.sql("SELECT producer, SUM(paid_price_eur) as ventas  \
                       FROM fact_item join dim_catalog on fk_dim_catalog=id_catalog GROUP BY producer ORDER BY ventas DESC")
users.show()

+--------------------+------------------+
|            producer|            ventas|
+--------------------+------------------+
|                null|148963.20999999714|
|Premium Frischeha...| 36086.92000000164|
|            Bonativo|23250.919999999896|
|   Bio Frischehandel| 21091.59999999932|
|Fleischerei Erchi...| 15254.66999999988|
|Landfleischerei R...| 14971.79999999999|
|     Blomeyer's Käse|13623.400000000085|
|     Ökohof Kuhhorst| 8423.879999999925|
|         BioBackHaus| 8183.139999999915|
|  Ökohof Lemke&Kluge| 7849.339999999884|
|    Ökodorf Brodowin| 6470.149999999893|
|   Die Müritzfischer|  6383.88999999997|
|         Hemme Milch| 6009.739999999879|
|Gemüsebaubetrieb ...| 5147.670000000027|
|  Fleischerei Bünger| 4824.249999999999|
|       Bauer Nietsch| 4511.310000000089|
|   Gläserne Molkerei| 4341.169999999968|
|Bauernkäserei Wol...|4020.1799999999826|
|Ökohof Teltower R...|3871.1699999999155|
|Berliner Lachsman...|3218.0399999999972|
+--------------------+------------

d. Ranking de clientes por ventas (fact_item.paid_price_total_eur

In [48]:

users = sqlContext.sql("SELECT id_dim_customer, SUM(paid_price_eur) as ventas  \
                       FROM fact_item join dim_customer on fk_dim_customer=id_dim_customer GROUP BY id_dim_customer ORDER BY ventas DESC")
users.show()

+---------------+------------------+
|id_dim_customer|            ventas|
+---------------+------------------+
|           1265| 4097.139999999961|
|           5416| 3990.409999999939|
|           3654|3367.3500000000004|
|           2750| 3226.549999999975|
|           5144| 3061.189999999996|
|            535|2548.0499999999956|
|            586|2326.9699999999907|
|           5064| 2179.300000000002|
|           3407| 2056.279999999994|
|           2751|2031.3100000000184|
|           1964|1850.9700000000053|
|            946|1672.7800000000045|
|           3445| 1640.970000000001|
|           2493|1491.4900000000073|
|           5557|1487.5599999999986|
|           1603|1407.6300000000035|
|           5202|1350.0400000000036|
|            516|1347.8800000000065|
|            726|1320.0600000000036|
|            332|1284.7000000000064|
+---------------+------------------+
only showing top 20 rows



e. Número de órdenes con más de 3 items y con valor pagado por item mayor a 30 euros (fact_item.paid_price_total_eur)

In [69]:
users = sqlContext.sql("SELECT fk_dim_order,count(*)  \
                       FROM fact_item  WHERE paid_price_eur >30 GROUP BY fk_dim_order HAVING COUNT(*) >3  ")
users.count()

8

### Ejercicios PySpark

Elija dos de las preguntas del punto anterior y respóndalas utilizando las funciones de PySpark
http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html
https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/
