In [1]:
from pyspark.sql import SparkSession, Row
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

spark = SparkSession\
    .builder\
    .master("local")\
    .appName("Introduction au DataFrame")\
    .getOrCreate()

spark

22/03/31 08:07:02 WARN Utils: Your hostname, Donors-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.2 instead (on interface en0)
22/03/31 08:07:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/31 08:07:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/31 08:07:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Création DataFrame

### Depuis un RDD

In [2]:
rdd = sc.textFile('alaska_airlines.csv')
rdd_row = rdd.map(lambda x: Row(annee=x[0], mois=x[1], jours=x[2], flightNum=x[3]))
df = spark.createDataFrame(rdd_row)
df.show(5)

                                                                                

+-----+----+-----+---------+
|annee|mois|jours|flightNum|
+-----+----+-----+---------+
|    a|   n|    n|        e|
|    2|   0|    0|        8|
|    2|   0|    0|        8|
|    2|   0|    0|        8|
|    2|   0|    0|        8|
+-----+----+-----+---------+
only showing top 5 rows



### Depuis un CSV

In [3]:
raw_df = spark.read.csv('alaska_airlines.csv',header=True)
raw_df.printSchema()
raw_df.columns

root
 |-- annee: string (nullable = true)
 |-- mois: string (nullable = true)
 |-- jours: string (nullable = true)
 |-- heure: string (nullable = true)
 |-- x1: string (nullable = true)
 |-- x2: string (nullable = true)
 |-- x3: string (nullable = true)
 |-- x4: string (nullable = true)
 |-- region: string (nullable = true)
 |-- passengers: string (nullable = true)
 |-- flightNum: string (nullable = true)
 |-- y1: string (nullable = true)
 |-- y2: string (nullable = true)
 |-- y3: string (nullable = true)
 |-- y4: string (nullable = true)
 |-- y5: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- cancellations: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- cancelled: string (nullable = true)
 |-- z1: string (nullable = true)
 |-- z2: string (nullable = true)
 |-- z3: string (nullable = true)
 |-- z4: string (nullable = true)
 |-- z5: string (nullable = true)



['annee',
 'mois',
 'jours',
 'heure',
 'x1',
 'x2',
 'x3',
 'x4',
 'region',
 'passengers',
 'flightNum',
 'y1',
 'y2',
 'y3',
 'y4',
 'y5',
 'origin',
 'dest',
 'distance',
 'cancellations',
 'delay',
 'cancelled',
 'z1',
 'z2',
 'z3',
 'z4',
 'z5']

## Manipulation

In [4]:
flights_1 = raw_df.select('origin', 'dest', 'cancelled', 'x1','x2','x3','x4')
flights_1.show(10)

+------+----+---------+----+----+----+----+
|origin|dest|cancelled|  x1|  x2|  x3|  x4|
+------+----+---------+----+----+----+----+
|   SEA| SJC|        0|2057|2052|2312|2258|
|   SEA| PSP|        0| 703| 715| 958| 951|
|   SAN| SEA|        0|2011|1846|2248|2145|
|   SEA| GEG|        0|2301|2300|2354|2359|
|   TUS| SEA|        0|1221|1221|1422|1438|
|   LAX| SEA|        0|1843|1840|2110|2125|
|   LAX| SEA|        0|2045|2045|2314|2330|
|   ANC| PDX|        0|  49|  50| 547| 523|
|   LAS| SEA|        0|1719|1715|1939|1956|
|   SJC| SEA|        0| 613| 630| 815| 844|
+------+----+---------+----+----+----+----+
only showing top 10 rows



In [5]:
raw_df.select('dest').distinct().count()

                                                                                

51

In [6]:
flights_1.describe().show(truncate = 5)



+-------+------+-----+---------+-----+-----+-----+-----+
|summary|origin| dest|cancelled|   x1|   x2|   x3|   x4|
+-------+------+-----+---------+-----+-----+-----+-----+
|  count| 15...|15...|    15...|15...|15...|15...|15...|
|   mean|  null| null|    0....|13...|13...|14...|14...|
|  st...|  null| null|    0....|51...|50...|53...|52...|
|    min|   ADK|  ADK|        0|    1|  100|    1|    1|
|    max|   YAK|  YAK|        1|   NA|  958|   NA|  959|
+-------+------+-----+---------+-----+-----+-----+-----+



                                                                                

In [7]:
flights_1.describe().toPandas()

                                                                                

Unnamed: 0,summary,origin,dest,cancelled,x1,x2,x3,x4
0,count,151102,151102,151102.0,151102.0,151102.0,151102.0,151102.0
1,mean,,,0.014156000582388,1333.6065953390969,1332.4152956281189,1480.40793920056,1489.5053209090547
2,stddev,,,0.1181342481644047,511.2859647572253,500.7152101927607,539.9724262180633,525.0600265528899
3,min,ADK,ADK,0.0,1.0,100.0,1.0,1.0
4,max,YAK,YAK,1.0,,958.0,,959.0


In [8]:
# flights_1.groupBy('origin').count().show()
flights_1.groupBy('origin', 'dest').count().show()



+------+----+-----+
|origin|dest|count|
+------+----+-----+
|   WRG| PSG|  364|
|   ADQ| ANC|  706|
|   SEA| RNO|  112|
|   SCC| BRW|  364|
|   DEN| SEA| 1422|
|   RNO| SEA|  112|
|   PSP| SEA|  664|
|   DFW| SEA| 1083|
|   SJC| PDX|  964|
|   SEA| LAX| 4681|
|   SNA| PDX| 1418|
|   SIT| JNU|  740|
|   PDX| PHX| 1081|
|   SMF| PDX|  102|
|   ANC| ADK|  102|
|   FAI| ANC| 2853|
|   PDX| SFO|  833|
|   ANC| OME|  365|
|   BOS| SEA|  618|
|   KTN| WRG|  364|
+------+----+-----+
only showing top 20 rows



                                                                                

In [9]:
flights_1.filter(flights_1.origin == 'ADQ').show()

+------+----+---------+----+----+----+----+
|origin|dest|cancelled|  x1|  x2|  x3|  x4|
+------+----+---------+----+----+----+----+
|   ADQ| ANC|        0|1816|1817|1904|1910|
|   ADQ| ANC|        0| 845| 853| 936| 948|
|   ADQ| ANC|        0| 839| 853| 929| 948|
|   ADQ| ANC|        0|1810|1817|1900|1910|
|   ADQ| ANC|        0|1820|1817|1913|1910|
|   ADQ| ANC|        0| 856| 853| 949| 948|
|   ADQ| ANC|        1|  NA|1817|  NA|1910|
|   ADQ| ANC|        0| 845| 853| 935| 948|
|   ADQ| ANC|        0| 906| 853|1001| 948|
|   ADQ| ANC|        0|1904|1817|1956|1910|
|   ADQ| ANC|        0|1819|1817|1915|1910|
|   ADQ| ANC|        0| 843| 853| 938| 948|
|   ADQ| ANC|        0| 855| 853| 945| 948|
|   ADQ| ANC|        0|1811|1817|1902|1910|
|   ADQ| ANC|        0|1817|1817|1911|1910|
|   ADQ| ANC|        0| 844| 853| 932| 948|
|   ADQ| ANC|        0|2102|1817|2151|1910|
|   ADQ| ANC|        0| 839| 853| 928| 948|
|   ADQ| ANC|        0|1840|1817|1928|1910|
|   ADQ| ANC|        0| 940| 853

## Création et aggrégation de variables

In [10]:
raw_df.withColumn('isLongFlight', raw_df.distance > 1000).select('distance', 'isLongFlight').show(10)

+--------+------------+
|distance|isLongFlight|
+--------+------------+
|     697|       false|
|     987|       false|
|    1050|        true|
|     224|       false|
|    1216|        true|
|     954|       false|
|     954|       false|
|    1542|        true|
|     866|       false|
|     697|       false|
+--------+------------+
only showing top 10 rows



## Valeurs manquantes

In [11]:
raw_df.fillna('White Tiger', 'z1').show(10)

+-----+----+-----+-----+----+----+----+----+------+----------+---------+---+---+---+---+---+------+----+--------+-------------+-----+---------+-----------+---+---+---+---+
|annee|mois|jours|heure|  x1|  x2|  x3|  x4|region|passengers|flightNum| y1| y2| y3| y4| y5|origin|dest|distance|cancellations|delay|cancelled|         z1| z2| z3| z4| z5|
+-----+----+-----+-----+----+----+----+----+------+----------+---------+---+---+---+---+---+------+----+--------+-------------+-----+---------+-----------+---+---+---+---+
| 2008|   1|    1|    2|2057|2052|2312|2258|    AS|       324|   N306AS|135|126|112| 14|  5|   SEA| SJC|     697|            7|   16|        0|White Tiger|  0| NA| NA| NA|
| 2008|   1|    1|    2| 703| 715| 958| 951|    AS|       572|   N302AS|175|156|144|  7|-12|   SEA| PSP|     987|            6|   25|        0|White Tiger|  0| NA| NA| NA|
| 2008|   1|    1|    2|2011|1846|2248|2145|    AS|       511|   N564AS|157|179|136| 63| 85|   SAN| SEA|    1050|            7|   14|       

22/03/31 08:15:39 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [12]:
raw_df.replace(['AS', 'TUS'], ['as', 'tus'])

DataFrame[annee: string, mois: string, jours: string, heure: string, x1: string, x2: string, x3: string, x4: string, region: string, passengers: string, flightNum: string, y1: string, y2: string, y3: string, y4: string, y5: string, origin: string, dest: string, distance: string, cancellations: string, delay: string, cancelled: string, z1: string, z2: string, z3: string, z4: string, z5: string]

## Requêtes SQL

In [47]:
raw_df.createOrReplaceTempView('flightsView')
sqlDF = spark.sql('SELECT origin, dest FROM flightsView')
sqlDF.show(10)

+------+----+
|origin|dest|
+------+----+
|   SEA| SJC|
|   SEA| PSP|
|   SAN| SEA|
|   SEA| GEG|
|   TUS| SEA|
|   LAX| SEA|
|   LAX| SEA|
|   ANC| PDX|
|   LAS| SEA|
|   SJC| SEA|
+------+----+
only showing top 10 rows



## Sample et astuces d'affichage

In [52]:
raw_df.sample(withReplacement=False, fraction=.0001, seed=21).toPandas()

Unnamed: 0,annee,mois,jours,heure,x1,x2,x3,x4,region,passengers,...,dest,distance,cancellations,delay,cancelled,z1,z2,z3,z4,z5
0,2008,3,7,5,1152,1214,1240,1301,AS,61,...,CDV,213,5,7,0,,0,,,
1,2008,3,13,4,1007,945,1210,1151,AS,386,...,OAK,671,5,16,0,,0,19.0,0.0,0.0
2,2008,3,22,6,1212,1215,1402,1358,AS,312,...,SJC,569,6,17,0,,0,,,
3,2008,4,7,1,1932,1942,2130,2143,AS,541,...,SEA,671,4,9,0,,0,,,
4,2008,4,15,2,604,610,848,855,AS,712,...,PHX,1107,6,18,0,,0,,,
5,2008,6,15,7,710,715,935,944,AS,582,...,SNA,859,5,10,0,,0,,,
6,2008,6,19,4,2204,2210,38,59,AS,133,...,ANC,1542,4,16,0,,0,,,
7,2008,7,14,1,1342,1348,1656,1707,AS,76,...,SEA,909,8,11,0,,0,,,
8,2008,7,21,1,1351,1345,1539,1530,AS,402,...,SJC,569,4,14,0,,0,,,
9,2008,8,15,5,659,700,937,948,AS,494,...,SAN,1050,6,10,0,,0,,,


In [53]:
sc.stop()