In [1]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("python spark SQL basic example")\
        .config("spark.some.config.option", "some-value")\
        .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/09 16:25:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
flightData2015 = (spark
                  .read
                  # averigua automaticamente el tipo de datos de la col
                  .option("inferSchema", "true")
                  # la primera fila del archivo son nombres de la columna
                  .option("header", "true")
                  .csv("./2015-summary.csv"))

In [3]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [4]:
#explai() no muestra la ejecución, solo muestra el plan para realizar la tarea sort() / lazy
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=33]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/antonio/Escritorio/portfolio/notebooks/spark-warehouse/2015..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [5]:
#reducimos la particion de salidas en solo 5 en lugar de las 200 por defecto
spark.conf.set("spark.sql.shuffle.partition", "5")

#take is like head()
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [6]:
# convertimos cualquier Dataframe a tabla SQL
flightData2015.createOrReplaceTempView("flight_data_2015")

sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1) FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
""")

sqlWay.explain()
sqlWay.show(3)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=55]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/antonio/Escritorio/portfolio/notebooks/spark-warehouse/2015..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


+-----------------+--------+
|DEST_COUNTRY_NAME|count(1)|
+-----------------+--------+
|         Anguilla|       1|
|           Russia|       1|
|         Paraguay|       1|
+-----------------+--------+
only showing top 3 rows


In [7]:
dataFrameWay = (flightData2015
                .groupBy("DEST_COUNTRY_NAME")
                .count())
dataFrameWay.explain()
dataFrameWay.show(3)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=114]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/antonio/Escritorio/portfolio/notebooks/spark-warehouse/2015..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|         Anguilla|    1|
|           Russia|    1|
|         Paraguay|    1|
+-----------------+-----+
only showing top 3 rows


In [8]:
spark.sql("SELECT MAX(COUNT) FROM flight_data_2015").take(1)

[Row(max(COUNT)=370002)]

In [9]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [10]:
maxsql = spark.sql("""
SELECT DEST_COUNTRY_NAME, SUM(count) as destination_total FROM flight_data_2015
        GROUP BY DEST_COUNTRY_NAME
            ORDER BY SUM(count) DESC
                   LIMIT 5
                   """)

maxsql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [11]:
from pyspark.sql.functions import desc

(flightData2015
 .groupBy("DEST_COUNTRY_NAME")
 .sum("count")
 .withColumnRenamed("sum(count)", "destination_total")
 .sort(desc("destination_total"))
 .limit(5)
 .show())

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [12]:
staticDataFrame = (spark.read.format("csv")
                   .option("header", "true")
                   .option("inferSchema", "true")
                   .load("./csv/online-retail-dataset.csv"))

#creamos vista temporal en sqp
staticDataFrame.createOrReplaceTempView("retail_data")
# Guardamos la estructura Schema de df para reusarlo en modo streaming
staticSchema = staticDataFrame.schema

staticDataFrame.show(5)

                                                                                

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows


In [13]:
from pyspark.sql.functions import window, column, desc, col, to_timestamp

# Convertimos el tipo de fecha ya que window requiere col tipo timestamp
staticDataFrame = staticDataFrame.withColumn(
    "InvoiceDate", to_timestamp("InvoiceDate", "M/d/yyyy H:mm")
)

#col -> nombres de columna
#window() agrupa los datos en intervalos de tiempo (dias)
#selectExpr -> permite escribir formulas como si fuera sql
(staticDataFrame
 .selectExpr("CustomerID", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")
         .groupBy(
             col("CustomerID"), window(col("InvoiceDate"), "1 day"))
        .sum("total_cost")
        .show(5))




+----------+--------------------+------------------+
|CustomerID|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|     17396|{2010-12-02 19:00...|             397.5|
|     12725|{2010-12-02 19:00...|427.79999999999995|
|     14404|{2010-12-04 19:00...|133.48000000000002|
|     18092|{2010-12-08 19:00...|1485.7200000000003|
|     17530|{2010-12-09 19:00...| 300.6399999999999|
+----------+--------------------+------------------+
only showing top 5 rows


                                                                                

In [14]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [15]:
#.readStream -> espera que lleguen datos poco a poco
#.schema -> que col y tipos de datos esperar
#maxFilesPerTrigger -> procesa solo un archivo poco a poco
streamingDataFrame = (spark.readStream
                      .schema(staticSchema)
                      .option("maxFilesPerTrigger", 1)
                      .format("csv")
                      .option("header", "true")
                      .load("./csv/"))

streamingDataFrame.isStreaming

True

In [16]:
purchaseByCustomerPerHour = (streamingDataFrame
                             .selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost",
                                         "InvoiceDate")
                                .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
                                .sum("total_cost"))



In [17]:
(purchaseByCustomerPerHour.writeStream
 .format("memory")
 .queryName("customer_purchases")
 .outputMode("complete")
 .start)

<bound method DataStreamWriter.start of <pyspark.sql.streaming.readwriter.DataStreamWriter object at 0x7f6da9668880>>

In [18]:
(purchaseByCustomerPerHour.writeStream
.format("console")
.queryName("customer_purchases_2")
.outputMode("complete")
.start())

25/07/09 16:25:30 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-5edb17b7-9308-4721-8dca-e3c0f3c9a2a0. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/07/09 16:25:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f6da9668a90>

25/07/09 16:25:31 ERROR Executor: Exception in task 1.0 in stage 28.0 (TID 36)
org.apache.spark.SparkDateTimeException: [CAST_INVALID_INPUT] The value '1/27/2011 18:05' of the type "STRING" cannot be cast to "TIMESTAMP" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"window" was called from
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

	at org.apache.spark.sql.errors.ExecutionErrors.invalidInputInCastToDatetimeErrorInternal(ExecutionErrors.scala:115)
	at org.apache.spark.sql.errors.ExecutionErrors.invalidInputInCastToDatetimeErrorInternal$(ExecutionErrors.scala:102)
	at org.apache.spark.sql.errors.ExecutionErrors$.invalidInputInCastToDatetimeErrorInternal(ExecutionErrors.scala:259)
	at org.apache.spark.sql.errors.ExecutionErrors.invalidInputInCastToDatetimeError(ExecutionErrors.scala:92)
	at org.apache.spark.sql.

In [19]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [28]:
#Need to conver numerical
from pyspark.sql.functions import date_format, col

preppedDataFrame = (staticDataFrame
                    #Rellenamos con 0 valores nulos
                    .na.fill(0)
                    #creamos nueva columna y colocamos nombre de del dia "monday..."
                    .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))
                    #reducimos el numero de particiones a 5
                    .coalesce(5))


In [31]:
#split data into training and test set
trainDataFrame = (preppedDataFrame
                  .where("InvoiceDate < '2011-07-01'"))

testDataFrame = (preppedDataFrame
                .where("InvoiceDate >= '2011-07-01'"))

trainDataFrame.count()
testDataFrame.count()

                                                                                

296006

In [None]:
from pyspark.ml.feature import StringIndexer
#StringIndexer = LabelEncoder --> convierte valores de texto como "monday" en número
indexer = (StringIndexer()
           #Columna a convertir
           .setInputCol("day_of_week")
           #Define como se llamara la nueva columna
           .setOutputCol("day_of_week_index"))


In [None]:
#utilizamos a onehotencoder para tener variables separadas
# de lo contrario el modelo pensara que lunes > a martes, como fuesen cantidades
from pyspark.ml.feature import OneHotEncoder
encoder = (OneHotEncoder()
           .setInputCol("day_of_week_index")
           .setOutputCol("day_of_week_encoded"))

In [None]:
from pyspark.ml.feature import VectorAssembler

# Unimos todas las columnas numericas en un solo vector, col feature
vectorAssembler = (VectorAssembler()
                   .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])
                   .setOutputCol("features"))

In [44]:
from pyspark.ml import Pipeline

transformationPipeline = (Pipeline()
                          .setStages([indexer, encoder, vectorAssembler]))

#train data frame
fittedPipeline = transformationPipeline.fit(trainDataFrame)


transformedTraining = fittedPipeline.transform(trainDataFrame)

                                                                                

In [47]:
from pyspark.ml.clustering import KMeans

kmeans = (KMeans()
          .setK(29) # numero de cluisters
          .setSeed(1)) # semilla aleatorio

In [53]:
kmModel = kmeans.fit(transformedTraining)

print("Training Cost (WSSSE):", kmModel.summary.trainingCost)




Training Cost (WSSSE): 37035449.850086644


                                                                                