In [1]:
!pip install spark
!pip install pyspark
!pip install findspark

Collecting spark
  Downloading spark-0.2.1.tar.gz (41 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/41.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.0/41.0 kB[0m [31m1.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: spark
  Building wheel for spark (setup.py) ... [?25l[?25hdone
  Created wheel for spark: filename=spark-0.2.1-py3-none-any.whl size=58748 sha256=b298088fd21ca34b7dde71782c5991213bd8e4b9533223989427fe0513c8ac6e
  Stored in directory: /root/.cache/pip/wheels/63/88/77/b4131110ea4094540f7b47c6d62a649807d7e94800da5eab0b
Successfully built spark
Installing collected packages: spark
Successfully installed spark-0.2.1
Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25

In [2]:
import spark
import pyspark
import findspark

findspark.init()

from spark import *
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

In [32]:
salesOrderHeader = spark.read.option('header','true').option('inferSchema','true').option('delimiter',';').csv('Sales.SalesOrderHeader.csv')
salesTerritory = spark.read.option('header','true').option('inferSchema','true').option('delimiter',';').csv('Sales.SalesTerritory.csv')
salesOrderDetail = spark.read.option('header', 'true').option('inferSchema', 'true').option('delimiter',';').csv('Sales.SalesOrderDetail.csv')
customer = spark.read.option('header', 'true').option('inferSchema', 'true').option('delimiter',';').csv('Sales.Customer.csv')
product = spark.read.option('header', 'true').option('inferSchema', 'true').option('delimiter',';').csv('Production.Product.csv')

In [52]:
salesOrderHeader.join(salesTerritory,'TerritoryID').\
select(
    'SalesOrderID',
    'Status',
    date_format('OrderDate', 'yyyy-MM').alias('Mes'),
    col('Name').alias('NombreTerritorio'),
    when(col('Status') == 1, 1).otherwise(0).alias('TrProceso1'),
    when(col('Status') == 2, 1).otherwise(0).alias('TrAprobadas1'),
    when(col('Status') == 3, 1).otherwise(0).alias('TrAtrasadas1'),
    when(col('Status') == 4, 1).otherwise(0).alias('TrRechazadas1'),
    when(col('Status') == 5, 1).otherwise(0).alias('TrEnviadas1'),
    when(col('Status') == 6, 1).otherwise(0).alias('TrCanceladas1'),
    when(col('Status') == 1, salesOrderHeader['TotalDue']).otherwise(0).alias('MntProceso1'),
    when(col('Status') == 2, salesOrderHeader['TotalDue']).otherwise(0).alias('MntAprobadas1'),
    when(col('Status') == 3, salesOrderHeader['TotalDue']).otherwise(0).alias('MntAtrasadas1'),
    when(col('Status') == 4, salesOrderHeader['TotalDue']).otherwise(0).alias('MntRechazadas1'),
    when(col('Status') == 5, salesOrderHeader['TotalDue'].cast(DecimalType(38,2))).otherwise(0).alias('MntEnviadas1'),
    when(col('Status') == 6, salesOrderHeader['TotalDue']).otherwise(0).alias('MntCanceladas1')
).\
groupBy(
    'Mes',
    'NombreTerritorio'
).\
agg(
    sum('TrProceso1').alias('TrProceso'),
    sum('TrAprobadas1').alias('TrAprobadas'),
    sum('TrAtrasadas1').alias('TrAtrasadas'),
    sum('TrRechazadas1').alias('TrRechazadas'),
    sum('TrEnviadas1').alias('TrEnviadas'),
    sum('TrCanceladas1').alias('TrCanceladas'),
    sum('MntProceso1').alias('MntProceso'),
    sum('MntAprobadas1').alias('MntAprobadas'),
    sum('MntAtrasadas1').alias('MntAtrasadas'),
    sum('MntRechazadas1').alias('MntRechazadas'),
    sum('MntEnviadas1').alias('MntEnviadas'),
    sum('MntCanceladas1').alias('MntCanceladas')
).\
orderBy('Mes', 'NombreTerritorio').\
show()

+-------+----------------+---------+-----------+-----------+------------+----------+------------+----------+------------+------------+-------------+-----------+-------------+
|    Mes|NombreTerritorio|TrProceso|TrAprobadas|TrAtrasadas|TrRechazadas|TrEnviadas|TrCanceladas|MntProceso|MntAprobadas|MntAtrasadas|MntRechazadas|MntEnviadas|MntCanceladas|
+-------+----------------+---------+-----------+-----------+------------+----------+------------+----------+------------+------------+-------------+-----------+-------------+
|2011-05|       Australia|        0|          0|          0|           0|         1|           0|       0.0|         0.0|         0.0|          0.0|    3756.99|          0.0|
|2011-05|          Canada|        0|          0|          0|           0|         8|           0|       0.0|         0.0|         0.0|          0.0|  133887.75|          0.0|
|2011-05|         Central|        0|          0|          0|           0|         4|           0|       0.0|         0.0|    

In [5]:
product_join = salesOrderDetail.join(product, 'ProductID').join(salesOrderHeader, 'SalesOrderID')
customer_join = product_join.join(customer, 'CustomerID')

In [6]:
product_qty = product_join.\
select('ProductID', date_format('OrderDate', 'yyyy-MM').alias('Mes'), 'Status', 'OrderQty').\
where(col('Status') == 5).\
groupBy('Mes','ProductID').\
agg(sum('OrderQty').alias('Cantidad'))

product_espec = Window.partitionBy('Mes').orderBy(desc('Cantidad'))
product_qty = product_qty.withColumn('rn', row_number().over(product_espec)).\
select(
    'Mes',
    when(col('rn') == 1, product_qty['Cantidad']).otherwise(0).alias('Producto_N1'),
    when(col('rn') == 2, product_qty['Cantidad']).otherwise(0).alias('Producto_N2'),
    when(col('rn') == 3, product_qty['Cantidad']).otherwise(0).alias('Producto_N3'),
).\
filter(col('rn') <= 3).\
groupBy('Mes').\
agg(
    sum('Producto_N1').alias('Producto_N1'),
    sum('Producto_N2').alias('Producto_N2'),
    sum('Producto_N3').alias('Producto_N3')
).\
orderBy('Mes')

In [7]:
customer_bill = customer_join.\
select(date_format('OrderDate', 'yyyy-MM').alias('Mes'), 'CustomerID', 'Status', col('TotalDue').cast(DecimalType(38,2)).alias('TotalDue')).\
filter(col('Status') == 5).\
groupBy('Mes', 'CustomerID').\
agg(sum('TotalDue').alias('total_factu'))

customer_spec = Window.partitionBy('Mes').orderBy(desc('total_factu'))
customer_bill = customer_bill.withColumn('rn', row_number().over(customer_spec)).\
select(
    'Mes',
    when(col('rn') == 1, customer_bill['total_factu']).otherwise(0).alias('Cliente_N1'),
    when(col('rn') == 2, customer_bill['total_factu']).otherwise(0).alias('Cliente_N2'),
    when(col('rn') == 3, customer_bill['total_factu']).otherwise(0).alias('Cliente_N3')
).\
filter(col('rn') <= 3).\
groupBy('Mes').\
agg(
    sum('Cliente_N1').alias('Cliente_N1'),
    sum('Cliente_N2').alias('Cliente_N2'),
    sum('Cliente_N3').alias('Cliente_N3')
)

In [8]:
product_qty.join(customer_bill, 'mes').show()

+-------+-----------+-----------+-----------+-----------+-----------+----------+
|    Mes|Producto_N1|Producto_N2|Producto_N3| Cliente_N1| Cliente_N2|Cliente_N3|
+-------+-----------+-----------+-----------+-----------+-----------+----------+
|2011-05|         49|         46|         44| 1214147.76| 1174143.88| 804115.41|
|2011-06|         28|         23|         21|    3953.99|    3953.99|   3953.99|
|2011-07|        134|        114|        103| 2884209.09| 2869162.56|1606960.04|
|2011-08|        167|        137|        113| 2255571.78| 1921769.85|1881760.20|
|2011-09|         27|         23|         23|    3953.99|    3953.99|   3953.99|
|2011-10|        240|        239|        224| 2846244.40| 2655280.25|2600539.48|
|2011-11|         40|         35|         34|    3953.99|    3953.99|   3953.99|
|2011-12|         52|         49|         45| 1158570.40|  907400.34| 649401.36|
|2012-01|        181|        155|        153| 2296157.22| 2034090.03|2020403.40|
|2012-02|         78|       

In [104]:
monthAVG = salesOrderHeader.select('subTotal', date_format('OrderDate','yyyy-MM').alias('Mes')).groupBy('Mes').agg(avg('subTotal').alias('monthAVG')).orderBy('Mes')

groupMonthAVG = salesOrderHeader.join(salesTerritory, 'TerritoryID').select(date_format('OrderDate', 'yyyy-MM').alias('Mes'),'SubTotal', 'Group').groupBy('Mes', 'Group').\
agg(avg('SubTotal').alias('GroupMonthAvg')).orderBy('Mes', 'Group')

monthAVG.join(groupMonthAVG, 'Mes').\
select(
    col('Mes').alias('OrderMonth'),
    col('Group').alias('TerritoryGroup'),
    round('monthAVG',2).alias('MonthAvg'),
    round(col('GroupMonthAvg') - col('MonthAvg'),2).alias('Diff'),
    when(col('GroupMonthAvg') > col('MonthAvg'), 'Above').\
    when(col('GroupMonthAvg') < col('MonthAvg'), 'Below').\
    otherwise('Equal').alias('Indicator')
).orderBy('Mes', 'Group').show()

+----------+--------------+--------+---------+---------+
|OrderMonth|TerritoryGroup|MonthAvg|     Diff|Indicator|
+----------+--------------+--------+---------+---------+
|   2011-05|        Europe|11716.42| -8316.43|    Below|
|   2011-05| North America|11716.42|   405.68|    Above|
|   2011-05|       Pacific|11716.42| -8316.43|    Below|
|   2011-06|        Europe| 3254.69|  -115.34|    Below|
|   2011-06| North America| 3254.69|   -81.78|    Below|
|   2011-06|       Pacific| 3254.69|   126.51|    Above|
|   2011-07|        Europe| 8851.08|  -5703.8|    Below|
|   2011-07| North America| 8851.08|  4603.72|    Above|
|   2011-07|       Pacific| 8851.08| -5578.46|    Below|
|   2011-08|        Europe| 9983.27| -6919.52|    Below|
|   2011-08| North America| 9983.27|  4073.21|    Above|
|   2011-08|       Pacific| 9983.27| -6636.22|    Below|
|   2011-09|        Europe| 3197.92|   -84.42|    Below|
|   2011-09| North America| 3197.92|  -119.01|    Below|
|   2011-09|       Pacific| 319

In [None]:
secuenciales = spark.read.option('header', 'true').option('delimiter', ',').option('inferSchema', 'true').csv('Secuenciales.csv')

In [31]:
sec_espec = Window.partitionBy('codigo').partitionBy('secuencial').orderBy('secuencial', 'secuencial')
# sec_espec = Window.partitionBy('secuencial').partitionBy('codigo').orderBy("secuencial").rowsBetween(Window.unboundedPreceding, Window.currentRow)
# secuenciales.withColumn('lag', lag('codigo', 1).over(sec_espec)).filter(col('lag').isNull()).show()
secuenciales.withColumn('rn', row_number().over(sec_espec)).orderBy("secuencial").show()

+----------+------+---+
|secuencial|codigo| rn|
+----------+------+---+
|         0|  3606|  1|
|         1|  3615|  1|
|         2|  3607|  1|
|         3|  3603|  1|
|         4|  3608|  1|
|         5|  3605|  1|
|         6|  3606|  1|
|         7|  3618|  1|
|         8|  3606|  1|
|         9|  3616|  1|
|        10|  3611|  1|
|        11|  3611|  1|
|        12|  3604|  1|
|        13|  3603|  1|
|        14|  3608|  1|
|        15|  3601|  1|
|        16|  3612|  1|
|        17|  3612|  1|
|        18|  3605|  1|
|        19|  3601|  1|
+----------+------+---+
only showing top 20 rows



In [179]:
res = []
currCod = 0
counter = 0

for value in secuenciales.collect():
  if currCod == value[1]:
    counter += 1
  else:
    counter = 0

  if counter == 3:
    res.append(currCod)

  currCod = value[1]

print(res)

[3620]


In [33]:
person = spark.read.option('header','true').option('inferSchema', 'true').option('delimiter', ';').csv('Person.Person.csv')

In [59]:
customers = customer.join(person, person['BusinessEntityID'] == customer['PersonID']).\
select(
    'CustomerID',
    concat_ws(' ', person.FirstName, person.LastName).alias('NombreCliente')
)

In [119]:
spec_compras = Window.partitionBy('CustomerID').orderBy(desc('OrderDate'))

ultimas_compras = salesOrderHeader\
.withColumn('rn', row_number().over(spec_compras))\
.select(
    'CustomerID',
    when(col('rn') == 1, salesOrderHeader['OrderDate']).alias('UltCompra'),
    when(col('rn') == 2, salesOrderHeader['OrderDate']).alias('PenultCompra'),
    when(col('rn') == 3, salesOrderHeader['OrderDate']).alias('AntepenultCompra')
)\
.filter(
    col('rn') <= 3
)\
.filter(
    col('Status') == 5
)\
.groupBy('CustomerID')\
.agg(
    max('UltCompra').alias('UltCompra'),
    max('PenultCompra').alias('PenultCompra'),
    max('AntepenultCompra').alias('AntepenultCompra')
)

In [131]:
ultimas_compras.join(customers, 'CustomerID')\
.select(
    date_format('UltCompra', 'yyyy-MM').alias('Mes'),
    'NombreCliente',
    col('UltCompra').alias('FechaUltCompra'),
    coalesce(date_diff('UltCompra', 'PenultCompra'), lit(0)).alias('DiasUC_PC'),
    coalesce(date_diff('PenultCompra', 'AntepenultCompra'), lit(0)).alias('DiasPC_APC')
)\
.orderBy(col('Mes').desc(), 'NombreCliente')\
.show()

+-------+-------------------+-------------------+---------+----------+
|    Mes|      NombreCliente|     FechaUltCompra|DiasUC_PC|DiasPC_APC|
+-------+-------------------+-------------------+---------+----------+
|2014-06|    Aaron Henderson|2014-06-04 00:00:00|        0|         0|
|2014-06|       Aaron Sharma|2014-06-02 00:00:00|       19|         0|
|2014-06|     Abigail Barnes|2014-06-11 00:00:00|        0|         0|
|2014-06|        Adrian Bell|2014-06-20 00:00:00|      183|         0|
|2014-06|     Adrian Stewart|2014-06-27 00:00:00|      182|         0|
|2014-06|       Adriana Rana|2014-06-17 00:00:00|        0|         0|
|2014-06|        Aidan Hayes|2014-06-03 00:00:00|        0|         0|
|2014-06|           Alan Sun|2014-06-14 00:00:00|        0|         0|
|2014-06|          Alan Zhou|2014-06-14 00:00:00|      198|         0|
|2014-06|      Albert Castro|2014-06-30 00:00:00|        0|         0|
|2014-06|      Alberto Gomez|2014-06-27 00:00:00|        0|         0|
|2014-