In [1]:
from pyspark import SparkContext, SparkConf
import pandas as pd

In [2]:
import pandas as pd
p_df = pd.DataFrame([("foo", 1), ("bar", 2), ("foo", 3)], columns=("k", "v"))
p_df['k'].unique()

array(['foo', 'bar'], dtype=object)

# Getting Started

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### how to create a Dataframe in pyspark

In [4]:
s_df = spark.createDataFrame([("foo", 1), ("bar", 2), ("foo", 3)], ('k', 'v'))

### Read csv (as a dataframe) and show it

In [5]:
df = spark.read.csv("Dati/doc1-2015100810.csv", header = True)

In [6]:
df.show()

+--------+----------------+----------+------------+--------------------+
|CODLINHA|       NOMELINHA|CODVEICULO|NUMEROCARTAO|      DATAUTILIZACAO|
+--------+----------------+----------+------------+--------------------+
|     280|N. SRA.DE NAZARÉ|     BC911|  0001430250|07/10/15 07:37:02...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0002470195|07/10/15 07:51:25...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|07/10/15 18:49:49...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|07/10/15 18:49:51...|
|     000|    OPER S/LINHA|     08047|  0000771305|07/10/15 16:47:16...|
|     000|    OPER S/LINHA|     08047|  0000856665|07/10/15 16:50:18...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002115218|07/10/15 13:18:26...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002178679|07/10/15 14:18:59...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|07/10/15 14:40:56...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|07/10/15 14:41:05...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002202962|

In [7]:
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()

root
 |-- CODLINHA: string (nullable = true)
 |-- NOMELINHA: string (nullable = true)
 |-- CODVEICULO: string (nullable = true)
 |-- NUMEROCARTAO: string (nullable = true)
 |-- DATAUTILIZACAO: string (nullable = true)



In [8]:
# Select only the "codlinha" column
df.select("codlinha").show()

+--------+
|codlinha|
+--------+
|     280|
|     280|
|     280|
|     280|
|     000|
|     000|
|     629|
|     629|
|     629|
|     629|
|     629|
|     629|
|     814|
|     814|
|     814|
|     814|
|     629|
|     629|
|     653|
|     653|
+--------+
only showing top 20 rows



In [9]:
df.select(df['codlinha'], df['nomelinha']).show()

+--------+----------------+
|codlinha|       nomelinha|
+--------+----------------+
|     280|N. SRA.DE NAZARÉ|
|     280|N. SRA.DE NAZARÉ|
|     280|N. SRA.DE NAZARÉ|
|     280|N. SRA.DE NAZARÉ|
|     000|    OPER S/LINHA|
|     000|    OPER S/LINHA|
|     629|  ALTO BOQUEIRÃO|
|     629|  ALTO BOQUEIRÃO|
|     629|  ALTO BOQUEIRÃO|
|     629|  ALTO BOQUEIRÃO|
|     629|  ALTO BOQUEIRÃO|
|     629|  ALTO BOQUEIRÃO|
|     814|       MOSSUNGUÊ|
|     814|       MOSSUNGUÊ|
|     814|       MOSSUNGUÊ|
|     814|       MOSSUNGUÊ|
|     629|  ALTO BOQUEIRÃO|
|     629|  ALTO BOQUEIRÃO|
|     653|          SABARÁ|
|     653|          SABARÁ|
+--------+----------------+
only showing top 20 rows



In [10]:
df.filter(df['codlinha'] == "280").show()

+--------+----------------+----------+------------+--------------------+
|CODLINHA|       NOMELINHA|CODVEICULO|NUMEROCARTAO|      DATAUTILIZACAO|
+--------+----------------+----------+------------+--------------------+
|     280|N. SRA.DE NAZARÉ|     BC911|  0001430250|07/10/15 07:37:02...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0002470195|07/10/15 07:51:25...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|07/10/15 18:49:49...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|07/10/15 18:49:51...|
|     280|N. SRA.DE NAZARÉ|     BC947|  0001426341|07/10/15 17:55:07...|
|     280|N. SRA.DE NAZARÉ|     BN602|  0001282115|07/10/15 06:29:09...|
|     280|N. SRA.DE NAZARÉ|     BN602|  0002883432|07/10/15 15:19:38...|
|     280|N. SRA.DE NAZARÉ|     BN602|  0002883432|07/10/15 15:19:43...|
|     280|N. SRA.DE NAZARÉ|     BN602|  0003530435|07/10/15 19:22:40...|
|     280|N. SRA.DE NAZARÉ|     BN602|  0002025345|07/10/15 22:58:12...|
|     280|N. SRA.DE NAZARÉ|     BN602|  0003253117|

In [11]:
df.groupBy("codlinha").count().show()

+--------+-----+
|codlinha|count|
+--------+-----+
|     467|  608|
|     829| 1098|
|     870| 2228|
|     666|  664|
|     TXA|  711|
|     475| 1390|
|     718|  398|
|     030| 4757|
|     205| 1711|
|     169| 1071|
|     334|  374|
|     TSP|  904|
|     462| 1935|
|     711|  769|
|     272|  864|
|     470|  477|
|     232|  805|
|     635|  365|
|     714|  344|
|     ARA| 8965|
+--------+-----+
only showing top 20 rows



### Running SQL Queries Programmatically

In [22]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+--------+----------------+----------+------------+--------------------+
|CODLINHA|       NOMELINHA|CODVEICULO|NUMEROCARTAO|      DATAUTILIZACAO|
+--------+----------------+----------+------------+--------------------+
|     280|N. SRA.DE NAZARÉ|     BC911|  0001430250|07/10/15 07:37:02...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0002470195|07/10/15 07:51:25...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|07/10/15 18:49:49...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|07/10/15 18:49:51...|
|     000|    OPER S/LINHA|     08047|  0000771305|07/10/15 16:47:16...|
|     000|    OPER S/LINHA|     08047|  0000856665|07/10/15 16:50:18...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002115218|07/10/15 13:18:26...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002178679|07/10/15 14:18:59...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|07/10/15 14:40:56...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|07/10/15 14:41:05...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002202962|

## SQL Timestamp

In [24]:
from pyspark.sql.functions import col, unix_timestamp, to_date, to_timestamp

df_timestamp = df.withColumn('datautilizacao', to_timestamp(unix_timestamp(col('datautilizacao'), 'dd/MM/yy HH:mm:ss').cast("timestamp")))

df_timestamp.show()

df_timestamp.createOrReplaceTempView("people_timestamp")

sqlDF = spark.sql("SELECT * FROM people_timestamp WHERE datautilizacao == '2015-10-07 16:47:16'")
sqlDF.show()

+--------+----------------+----------+------------+-------------------+
|CODLINHA|       NOMELINHA|CODVEICULO|NUMEROCARTAO|     datautilizacao|
+--------+----------------+----------+------------+-------------------+
|     280|N. SRA.DE NAZARÉ|     BC911|  0001430250|2015-10-07 07:37:02|
|     280|N. SRA.DE NAZARÉ|     BC911|  0002470195|2015-10-07 07:51:25|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|2015-10-07 18:49:49|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|2015-10-07 18:49:51|
|     000|    OPER S/LINHA|     08047|  0000771305|2015-10-07 16:47:16|
|     000|    OPER S/LINHA|     08047|  0000856665|2015-10-07 16:50:18|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002115218|2015-10-07 13:18:26|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002178679|2015-10-07 14:18:59|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|2015-10-07 14:40:56|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|2015-10-07 14:41:05|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002202962|2015-10-07 16

### Global Temporary View

Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.

In [14]:
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()

+--------+----------------+----------+------------+--------------------+
|CODLINHA|       NOMELINHA|CODVEICULO|NUMEROCARTAO|      DATAUTILIZACAO|
+--------+----------------+----------+------------+--------------------+
|     280|N. SRA.DE NAZARÉ|     BC911|  0001430250|07/10/15 07:37:02...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0002470195|07/10/15 07:51:25...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|07/10/15 18:49:49...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|07/10/15 18:49:51...|
|     000|    OPER S/LINHA|     08047|  0000771305|07/10/15 16:47:16...|
|     000|    OPER S/LINHA|     08047|  0000856665|07/10/15 16:50:18...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002115218|07/10/15 13:18:26...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002178679|07/10/15 14:18:59...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|07/10/15 14:40:56...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|07/10/15 14:41:05...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002202962|

In [15]:
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+--------+----------------+----------+------------+--------------------+
|CODLINHA|       NOMELINHA|CODVEICULO|NUMEROCARTAO|      DATAUTILIZACAO|
+--------+----------------+----------+------------+--------------------+
|     280|N. SRA.DE NAZARÉ|     BC911|  0001430250|07/10/15 07:37:02...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0002470195|07/10/15 07:51:25...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|07/10/15 18:49:49...|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|07/10/15 18:49:51...|
|     000|    OPER S/LINHA|     08047|  0000771305|07/10/15 16:47:16...|
|     000|    OPER S/LINHA|     08047|  0000856665|07/10/15 16:50:18...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002115218|07/10/15 13:18:26...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002178679|07/10/15 14:18:59...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|07/10/15 14:40:56...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|07/10/15 14:41:05...|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002202962|

## Interoperating with RDDs
Spark SQL supports two different methods for converting existing RDDs into Datasets. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

The second method for creating Datasets is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.

### Inferring the Schema Using Reflection (DOESN'T WORK AND I DON'T KNOW WHY, BUT THIS IS NOT IMPORTANT)

## TEST WITH THE LAST DATETIME COLUMN DIVIDED

I divided the last column in two (one for date and other for time)

In [16]:
df_test = spark.read.csv("Dati/ciao.csv", header = True)
df_test.show()
df_test.printSchema()

+--------+----------------+----------+------------+--------------+-------------+
|CODLINHA|       NOMELINHA|CODVEICULO|NUMEROCARTAO|DATAUTILIZACAO|ORAUTILIZACAO|
+--------+----------------+----------+------------+--------------+-------------+
|     280|N. SRA.DE NAZARÉ|     BC911|  0001430250|      07/10/15|     07:37:02|
|     280|N. SRA.DE NAZARÉ|     BC911|  0002470195|      07/10/15|     07:51:25|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|      07/10/15|     18:49:49|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|      07/10/15|     18:49:51|
|     000|    OPER S/LINHA|     08047|  0000771305|      07/10/15|     16:47:16|
|     000|    OPER S/LINHA|     08047|  0000856665|      07/10/15|     16:50:18|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002115218|      07/10/15|     13:18:26|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002178679|      07/10/15|     14:18:59|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|      07/10/15|     14:40:56|
|     629|  ALTO BOQUEIRÃO| 

## Convert column String to Datetime into the Dataframe

In [17]:


df_datetime = df_test.withColumn('datautilizacao', to_date(unix_timestamp(col('datautilizacao'), 'dd/MM/yy').cast("timestamp")))

df_datetime.show()
df_datetime.printSchema()

+--------+----------------+----------+------------+--------------+-------------+
|CODLINHA|       NOMELINHA|CODVEICULO|NUMEROCARTAO|datautilizacao|ORAUTILIZACAO|
+--------+----------------+----------+------------+--------------+-------------+
|     280|N. SRA.DE NAZARÉ|     BC911|  0001430250|    2015-10-07|     07:37:02|
|     280|N. SRA.DE NAZARÉ|     BC911|  0002470195|    2015-10-07|     07:51:25|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|    2015-10-07|     18:49:49|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|    2015-10-07|     18:49:51|
|     000|    OPER S/LINHA|     08047|  0000771305|    2015-10-07|     16:47:16|
|     000|    OPER S/LINHA|     08047|  0000856665|    2015-10-07|     16:50:18|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002115218|    2015-10-07|     13:18:26|
|     629|  ALTO BOQUEIRÃO|     KA603|  0002178679|    2015-10-07|     14:18:59|
|     629|  ALTO BOQUEIRÃO|     KA603|  0000849493|    2015-10-07|     14:40:56|
|     629|  ALTO BOQUEIRÃO| 

## Convert String to Datetime (but the result is another column)

In [18]:
from pyspark.sql.types import *

#df_test = df_test.withColumn("datautilizacao", df_test["datautilizacao"].cast(StringType()))

from pyspark.sql.functions import to_date

df_test3 = df_test.select(to_date('datautilizacao', 'dd/MM/yy').alias('datautilizacao'))
#df_test.select(to_date(df_test.datautilizacao, 'dd/MM/yy').alias('datautilizacao'))

df_test3.show()
df_test3.printSchema()

+--------------+
|datautilizacao|
+--------------+
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
|    2015-10-07|
+--------------+
only showing top 20 rows

root
 |-- datautilizacao: date (nullable = true)



## SQL with Datatime

In [19]:
df_datetime.createOrReplaceTempView("people_test")

sqlDF = spark.sql("SELECT codlinha FROM people_test WHERE datautilizacao == '2015-10-07'")
sqlDF.show()

df_datetime.show()

+--------+
|codlinha|
+--------+
|     280|
|     280|
|     280|
|     280|
|     000|
|     000|
|     629|
|     629|
|     629|
|     629|
|     629|
|     629|
|     814|
|     814|
|     814|
|     814|
|     629|
|     629|
|     653|
|     653|
+--------+
only showing top 20 rows

+--------+----------------+----------+------------+--------------+-------------+
|CODLINHA|       NOMELINHA|CODVEICULO|NUMEROCARTAO|datautilizacao|ORAUTILIZACAO|
+--------+----------------+----------+------------+--------------+-------------+
|     280|N. SRA.DE NAZARÉ|     BC911|  0001430250|    2015-10-07|     07:37:02|
|     280|N. SRA.DE NAZARÉ|     BC911|  0002470195|    2015-10-07|     07:51:25|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|    2015-10-07|     18:49:49|
|     280|N. SRA.DE NAZARÉ|     BC911|  0003234514|    2015-10-07|     18:49:51|
|     000|    OPER S/LINHA|     08047|  0000771305|    2015-10-07|     16:47:16|
|     000|    OPER S/LINHA|     08047|  0000856665|    2015-10