In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
sc = SparkContext(conf = SparkConf().setAppName('Demo SparkSQL'))
sqlc = SQLContext(sc)
sqlc

<pyspark.sql.context.SQLContext at 0x7f7b140fbdd8>

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Demo SparkSQL').getOrCreate()

# Création de DataFrame

## à partir de RDD

In [3]:
data = [['Machine Learning', 1.0],['Big Data', 1.5], ['DataWarehouse', 1.5]]
rdd = spark.sparkContext.parallelize(data)
df = spark.createDataFrame(rdd)

In [4]:
df.collect()

[Row(_1='Machine Learning', _2=1.0),
 Row(_1='Big Data', _2=1.5),
 Row(_1='DataWarehouse', _2=1.5)]

In [5]:
spark.createDataFrame(rdd, schema="module string, coef float").collect()

[Row(module='Machine Learning', coef=1.0),
 Row(module='Big Data', coef=1.5),
 Row(module='DataWarehouse', coef=1.5)]

In [6]:
spark.createDataFrame(rdd, schema=["module", "coef"]).collect()

[Row(module, coef='Machine Learning', _2=1.0),
 Row(module, coef='Big Data', _2=1.5),
 Row(module, coef='DataWarehouse', _2=1.5)]

In [7]:
from pyspark.sql import Row
spark.createDataFrame([Row(module='Machine Learning', coef=1.0), Row(module='Big Data', coef=1.5), Row(module='DataWarehouse', coef=1.5)]).collect()

[Row(coef=1.0, module='Machine Learning'),
 Row(coef=1.5, module='Big Data'),
 Row(coef=1.5, module='DataWarehouse')]

## Lecture CSV

In [8]:
spark.read.csv('zipcodes.csv', header=True).take(2)

[Row(zip_code='00501', latitude='40.922326', longitude='-72.637078', city='Holtsville', state='NY', county='Suffolk'),
 Row(zip_code='00544', latitude='40.922326', longitude='-72.637078', city='Holtsville', state='NY', county='Suffolk')]

# Commandes SQL

In [10]:
zipcodes = spark.read.csv('zipcodes.csv', header=True).cache()

In [11]:
zipcodes.createOrReplaceTempView('zip_table')

In [12]:
spark.sql("SELECT latitude, longitude FROM zip_table WHERE state='NY' and zip_code<10000").collect()

[Row(latitude='40.922326', longitude='-72.637078'),
 Row(latitude='40.922326', longitude='-72.637078'),
 Row(latitude='40.992288', longitude='-72.723496')]

# Opérations sur les DataFrames

## Opérations pour les clauses SQL

In [13]:
zipcodes.where((zipcodes.state=='NY') & (zipcodes.zip_code<10000)).select(zipcodes.latitude, 'longitude').collect()

[Row(latitude='40.922326', longitude='-72.637078'),
 Row(latitude='40.922326', longitude='-72.637078'),
 Row(latitude='40.992288', longitude='-72.723496')]

In [14]:
zipcodes.agg({'latitude':'mean', 'longitude':'max'}).collect()

[Row(avg(latitude)=38.49921866596052, max(longitude)='166.410291')]

## Autres opérations

In [15]:
zipcodes.show(5)

+--------+---------+----------+----------+-----+---------+
|zip_code| latitude| longitude|      city|state|   county|
+--------+---------+----------+----------+-----+---------+
|   00501|40.922326|-72.637078|Holtsville|   NY|  Suffolk|
|   00544|40.922326|-72.637078|Holtsville|   NY|  Suffolk|
|   00601|18.165273|-66.722583|  Adjuntas|   PR| Adjuntas|
|   00602|18.393103|-67.180953|    Aguada|   PR|   Aguada|
|   00603|18.455913| -67.14578| Aguadilla|   PR|Aguadilla|
+--------+---------+----------+----------+-----+---------+
only showing top 5 rows



In [16]:
zipcodes.printSchema()

root
 |-- zip_code: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- county: string (nullable = true)



In [17]:
zipcodes.describe('latitude').collect()

[Row(summary='count', latitude='42049'),
 Row(summary='mean', latitude='38.49921866596052'),
 Row(summary='stddev', latitude='5.414030114734146'),
 Row(summary='min', latitude='-7.209975'),
 Row(summary='max', latitude='70.494693')]

In [18]:
from pyspark.sql import Column
from pyspark.sql import functions as F
zipcodes = zipcodes.withColumn('Region', F.when(zipcodes.latitude<38.852334,'South').otherwise('North'))

In [19]:
zipcodes.show(5)

+--------+---------+----------+----------+-----+---------+------+
|zip_code| latitude| longitude|      city|state|   county|Region|
+--------+---------+----------+----------+-----+---------+------+
|   00501|40.922326|-72.637078|Holtsville|   NY|  Suffolk| North|
|   00544|40.922326|-72.637078|Holtsville|   NY|  Suffolk| North|
|   00601|18.165273|-66.722583|  Adjuntas|   PR| Adjuntas| South|
|   00602|18.393103|-67.180953|    Aguada|   PR|   Aguada| South|
|   00603|18.455913| -67.14578| Aguadilla|   PR|Aguadilla| South|
+--------+---------+----------+----------+-----+---------+------+
only showing top 5 rows



In [20]:
zipcodes = zipcodes.drop('Region')

In [21]:
zipcodes.printSchema()

root
 |-- zip_code: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- county: string (nullable = true)

