# pyspark.sql.dataframe

#### 0 - getting lab data

In [1]:
import pandas as pd 
pandas_df_transactions=pd.read_csv('transactions.txt',sep="\t")
pandas_df_magasins=pd.read_csv('magasins.txt',sep="\t")

In [2]:
df_transactions=spark.createDataFrame(pandas_df_transactions)
df_transactions.printSchema()
df_transactions.first()

root
 |-- transaction_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- magasin: long (nullable = true)
 |-- quantity: long (nullable = true)
 |-- unit_price: long (nullable = true)



Row(transaction_id=0, date=u'2016-06-01', magasin=39, quantity=23, unit_price=12)

df_magasins=spark.createDataFrame(pandas_df_magasins)
df_magasins.printSchema()
df_magasins.first()

### a - basic Dataframe operations

In [4]:
df_trs=df_transactions
df_trs

DataFrame[transaction_id: bigint, date: string, magasin: bigint, quantity: bigint, unit_price: bigint]

#### a - 0 actions (not lazy, launch all previous non cached transformations leading to current DF)

In [8]:
# first 
print df_trs.first()

Row(transaction_id=0, date=u'2016-06-01', magasin=39, quantity=23, unit_price=12)


In [11]:
print df_trs.first().date
print df_trs.first().asDict()


2016-06-01
{'date': u'2016-06-01', 'quantity': 23, 'unit_price': 12, 'transaction_id': 0, 'magasin': 39}


In [12]:
# count 
df_trs.count()

468

In [13]:
#toPandas
df_trs.filter('date="2016-06-01"').toPandas()

Unnamed: 0,transaction_id,date,magasin,quantity,unit_price
0,0,2016-06-01,39,23,12
1,234,2016-06-01,12,61,4


In [15]:
#collect all elements as list of rows
df_trs.filter('date="2016-06-01"').collect()

[Row(transaction_id=0, date=u'2016-06-01', magasin=39, quantity=23, unit_price=12),
 Row(transaction_id=234, date=u'2016-06-01', magasin=12, quantity=61, unit_price=4)]

In [18]:
df_trs.show(5)  # default: 20 rows

+--------------+----------+-------+--------+----------+
|transaction_id|      date|magasin|quantity|unit_price|
+--------------+----------+-------+--------+----------+
|             0|2016-06-01|     39|      23|        12|
|             1|2016-06-02|      2|       9|        13|
|             2|2016-06-03|     41|      69|         5|
|             3|2016-06-04|     32|      23|         4|
|             4|2016-06-05|     37|       4|         2|
+--------------+----------+-------+--------+----------+
only showing top 5 rows



In [31]:
# take(n) : list of n first elements
df_trs.take(3)

[Row(transaction_id=0, date=u'2016-06-01', magasin=39, quantity=23, unit_price=12),
 Row(transaction_id=1, date=u'2016-06-02', magasin=2, quantity=9, unit_price=13),
 Row(transaction_id=2, date=u'2016-06-03', magasin=41, quantity=69, unit_price=5)]

##### NB : saving data to storage :
##### way : 1 - use write to get lazily a DataFrameWriter to write then unlazily to used distributed storage 


In [33]:
#get DataFrameWriter to write parquet to used distributed storage in a given data format (csv, parquet, json, avro, ...)
print type(df_trs.write)
# df_trs.write.csv('path/to/my_file/in/the/used/disributed/storage/system/my_file.csv')
# df_trs.write.json('path/to/my_file/in/the/used/disributed/storage/system/my_file.json')
# df_trs.write.parquet('path/to/my_file/in/the/used/disributed/storage/system/my_file.parquet')



<class 'pyspark.sql.readwriter.DataFrameWriter'>


##### way : 2 - for small datasets its possible to get unlazily the DF data into the driver (toPandas() ) and write then to driver's machine storage using pyhton

In [34]:
df_pandas=df_trs.toPandas()

In [35]:
df_pandas.to_csv('transactions_without_header.csv',sep='\t',header=False,index=False)

##### a - 1 - Selecting columns (lazy)

In [36]:
# selecting string expression of columns
df1=df_trs.select("transaction_id","date",'magasin')

In [37]:
# selecting pyspark.sql.Column type (which can be manipulated with column functions)
df2=df_trs.select(
                    df_trs["transaction_id"], 
                    ( df_trs["quantity"]* df_trs["unit_price"] ).alias("CA") ,
                    df_trs["date"], 
                )



In [38]:
#non recomended as a field can be a df property
df3=df_trs.select(df_trs.transaction_id,df_trs.date )
                

In [39]:
df1.printSchema()    
df2.printSchema()    
df3.printSchema()    


root
 |-- transaction_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- magasin: long (nullable = true)

root
 |-- transaction_id: long (nullable = true)
 |-- CA: long (nullable = true)
 |-- date: string (nullable = true)

root
 |-- transaction_id: long (nullable = true)
 |-- date: string (nullable = true)



In [40]:
#select expression
df_trs.selectExpr("transaction_id",
                "quantity*unit_price as CA"
               ).printSchema()


root
 |-- transaction_id: long (nullable = true)
 |-- CA: long (nullable = true)



In [41]:
exprr=["if (magasin=0,unit_price,unit_price+2) as unit_price_taxed",  "quantity*unit_price as CA","*"]
df_trs.selectExpr(exprr).printSchema()

root
 |-- unit_price_taxed: long (nullable = true)
 |-- CA: long (nullable = true)
 |-- transaction_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- magasin: long (nullable = true)
 |-- quantity: long (nullable = true)
 |-- unit_price: long (nullable = true)



##### a - 2 - filtering on columns (lazy)

In [42]:
# filtering via a condition as a string of SQL expression
df_filtered_trascations1=df_trs.filter(
                                        "(magasin=0 or magasin=1) and (quantity>5 or unit_price>10)"
                                        )
df_filtered_trascations2=df_trs.filter(
                                        "(magasin in (0,1)) and (quantity>5 or unit_price>10)"
                                        )

In [44]:
print df_trs.count()
print df_filtered_trascations1.count()
print df_filtered_trascations2.count()

468
20
20


In [45]:
# filtering via a condition as a Column of type  pyspark.sql.types.BooleanType 
print type(df_trs.magasin)
print type(df_trs.magasin<2)
print type(df_trs.magasin.isin(0,1))

<class 'pyspark.sql.column.Column'>
<class 'pyspark.sql.column.Column'>
<class 'pyspark.sql.column.Column'>


In [46]:
df_filtered_trascations1= df_trs.filter(
                                    (df_trs['magasin'].isin(0,1)) & (
                                                                        (
                                                                            (df_trs['quantity']>5) | (df_trs['unit_price']>10)
                                                                         )
                                                                    )
                        )



In [47]:
print df_trs.count()
print df_filtered_trascations1.count()

468
20


In [54]:
df_filtered_trascations_tmp= df_trs.select('*',
                                                
                                                (  (df_trs['magasin'].isin(0,1)) & (   
                                                                                        # how to use
                                                                                       ~ (
                                                                                            (~(df_trs['quantity']>5)) &
                                                                                            (~(df_trs['unit_price']>10))
                                                                                         )
                                                                                    )
                                                ).alias('boolean_filter')
                        )

df_filtered_trascations_2_1=df_filtered_trascations_tmp.filter('boolean_filter')
df_filtered_trascations_2_2=df_filtered_trascations_tmp.filter('boolean_filter=true')
df_filtered_trascations_3_1=df_filtered_trascations_tmp.filter(df_filtered_trascations_tmp['boolean_filter'])
df_filtered_trascations_3_2=df_filtered_trascations_tmp.filter(df_filtered_trascations_tmp['boolean_filter']==True)

In [55]:
# la condition donnée à filter en première ligne n'est autre qu'une colonne de type boolean (testé en la créant avec un select)
print type(df_filtered_trascations_tmp.boolean_filter)
print '\n'
df_filtered_trascations_tmp.printSchema()


<class 'pyspark.sql.column.Column'>


root
 |-- transaction_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- magasin: long (nullable = true)
 |-- quantity: long (nullable = true)
 |-- unit_price: long (nullable = true)
 |-- boolean_filter: boolean (nullable = true)



In [56]:
print df_trs.count()
print df_filtered_trascations_2_1.count()
print df_filtered_trascations_2_2.count()
print df_filtered_trascations_3_1.count()
print df_filtered_trascations_3_2.count()

468
20
20
20
20


##### a - 3 - other basic transformations (lazy)

In [57]:
# add a column
df_trs.withColumn('CA',(df_trs['quantity']*df_trs['unit_price'])).printSchema()

root
 |-- transaction_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- magasin: long (nullable = true)
 |-- quantity: long (nullable = true)
 |-- unit_price: long (nullable = true)
 |-- CA: long (nullable = true)



In [58]:
# rename a column
df_trs.withColumnRenamed('unit_price','price').printSchema()

root
 |-- transaction_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- magasin: long (nullable = true)
 |-- quantity: long (nullable = true)
 |-- price: long (nullable = true)



In [59]:
df_trs.drop('magasin').printSchema()

root
 |-- transaction_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- unit_price: long (nullable = true)



In [60]:
print df_trs.count()
print df_trs.dropDuplicates(['quantity','unit_price']).count()

468
412


In [62]:
# select distinct elements 
df_tmp=df_trs.select('quantity','unit_price')
df2=df_tmp.distinct()

In [63]:
print df_tmp.count()
print df2.count()

468
412


In [64]:
print df_trs.dropDuplicates(['quantity','unit_price']).count()


412


In [65]:
#orderBY
print df_trs.select('date','magasin').orderBy("date", ascending=True).take(3)


[Row(date=u'2016-06-01', magasin=12), Row(date=u'2016-06-01', magasin=39), Row(date=u'2016-06-02', magasin=15)]


In [66]:
print df_trs.select('date','magasin').sort("date", ascending=True).take(3)

[Row(date=u'2016-06-01', magasin=39), Row(date=u'2016-06-01', magasin=12), Row(date=u'2016-06-02', magasin=2)]


In [67]:
print df_trs.select('date','magasin').sort(["date",'magasin'], ascending=True).take(3)

[Row(date=u'2016-06-01', magasin=12), Row(date=u'2016-06-01', magasin=39), Row(date=u'2016-06-02', magasin=2)]


In [68]:
import pyspark.sql.functions as F 
print df_trs.select('date','magasin').orderBy(F.asc('date')).take(3)
print df_trs.select('date','magasin').orderBy(F.asc('date'),F.desc('magasin')).take(3)
print df_trs.select('date','magasin').orderBy(F.asc('date'),F.asc('magasin')).take(3)

[Row(date=u'2016-06-01', magasin=12), Row(date=u'2016-06-01', magasin=39), Row(date=u'2016-06-02', magasin=15)]
[Row(date=u'2016-06-01', magasin=39), Row(date=u'2016-06-01', magasin=12), Row(date=u'2016-06-02', magasin=15)]
[Row(date=u'2016-06-01', magasin=12), Row(date=u'2016-06-01', magasin=39), Row(date=u'2016-06-02', magasin=2)]


In [69]:
print df_trs.select('date','magasin').orderBy(df_trs['date'].asc(), df_trs['date'].desc()).take(3)

[Row(date=u'2016-06-01', magasin=39), Row(date=u'2016-06-01', magasin=12), Row(date=u'2016-06-02', magasin=2)]


In [70]:
print df_trs.select('date','magasin').orderBy(F.col('date').asc(), F.col('magasin').desc()).take(3)

[Row(date=u'2016-06-01', magasin=39), Row(date=u'2016-06-01', magasin=12), Row(date=u'2016-06-02', magasin=15)]


In [76]:
#intersect - subtract - unionAll

In [77]:
df_trs_magasin_0=df_trs.filter('magasin=0')
df_trs_magasin_1=df_trs.filter('magasin=1')
df_trs_magasin_1_0=df_trs.filter('magasin=1 or magasin=0')

In [78]:
print df_trs_magasin_0.select('magasin').distinct().collect()

[Row(magasin=0)]


In [79]:
print df_trs_magasin_1.select('magasin').distinct().collect()

[Row(magasin=1)]


In [80]:
union_df=df_trs_magasin_0.unionAll(df_trs_magasin_1)
union_df.select('magasin').distinct().collect()

[Row(magasin=0), Row(magasin=1)]

In [81]:
union_df.subtract(df_trs_magasin_0).select('magasin').distinct().collect()

[Row(magasin=1)]

In [82]:
union_df.subtract(df_trs_magasin_1).select('magasin').distinct().collect()

[Row(magasin=0)]

In [83]:
union_df.intersect(df_trs_magasin_1).select('magasin').distinct().collect()

[Row(magasin=1)]

In [84]:
union_df.intersect(df_trs_magasin_0).select('magasin').distinct().collect()

[Row(magasin=0)]

In [72]:
# null values

In [71]:
#drop every row that has at least one null value
df_trs.dropna(how='any',subset=['transaction_id','date'])
#drop every row that has null values for all fields
df_trs.dropna(how='all',subset=['transaction_id','date'])

DataFrame[transaction_id: bigint, date: string, magasin: bigint, quantity: bigint, unit_price: bigint]

In [73]:
df_trs.fillna({'date': '2017-01-01', 'transaction_id': 0})

DataFrame[transaction_id: bigint, date: string, magasin: bigint, quantity: bigint, unit_price: bigint]

### b - aggregation

In [85]:
df_trs

DataFrame[transaction_id: bigint, date: string, magasin: bigint, quantity: bigint, unit_price: bigint]

In [86]:
df_trs.groupBy('magasin')

<pyspark.sql.group.GroupedData at 0x7fec1ac32c90>

In [87]:
print df_trs.groupBy('magasin').count()
print df_trs.groupBy('magasin').count().take(2)


DataFrame[magasin: bigint, count: bigint]
[Row(magasin=29, count=15), Row(magasin=26, count=6)]


In [89]:
#specific built in aggregation on grouped data


In [88]:
print df_trs.groupBy('magasin').count().first()
print df_trs.groupBy('magasin').avg('quantity','unit_price').first()
print df_trs.groupBy('magasin').sum('quantity').first()
print df_trs.groupBy('magasin').min('unit_price').first()
print df_trs.groupBy('magasin').max('unit_price').first()

Row(magasin=29, count=15)
Row(magasin=29, avg(quantity)=49.53333333333333, avg(unit_price)=10.333333333333334)
Row(magasin=29, sum(quantity)=743)
Row(magasin=29, min(unit_price)=2)
Row(magasin=29, max(unit_price)=18)


In [90]:
df_trs.groupBy('magasin').pivot('date')

<pyspark.sql.group.GroupedData at 0x7fec1ace45d0>

In [96]:
(df_trs
 .groupBy('magasin')
 .pivot('date',['2016-06-01','2016-06-02','2016-06-03','2016-06-04','2016-06-05','2016-06-06'])
 .mean('unit_price')
 .show(10)
)

+-------+----------+----------+----------+----------+----------+----------+
|magasin|2016-06-01|2016-06-02|2016-06-03|2016-06-04|2016-06-05|2016-06-06|
+-------+----------+----------+----------+----------+----------+----------+
|     29|      null|      null|      null|      null|      null|      null|
|     26|      null|      null|      null|      null|      null|      null|
|     19|      null|      null|      null|      null|      null|      null|
|      0|      null|      null|      null|      null|      null|      null|
|     22|      null|      null|      null|      null|      null|      null|
|      7|      null|      null|      null|      null|      null|      null|
|     34|      null|      null|      null|      null|      null|      null|
|     43|      null|      null|      null|      null|      null|      null|
|     32|      null|      null|      null|       4.0|      null|      null|
|     31|      null|      null|      null|      null|      null|      null|
+-------+---

In [97]:
#agg : for multiple different aggregations


In [98]:
#using dict 
df_trs.groupBy('magasin').agg({'unit_price': 'mean','quantity':'sum'}).show(5)

+-------+-------------+------------------+
|magasin|sum(quantity)|   avg(unit_price)|
+-------+-------------+------------------+
|     29|          743|10.333333333333334|
|     26|          200|11.833333333333334|
|     19|          509|               8.3|
|      0|          367|             10.25|
|     22|          463| 9.222222222222221|
+-------+-------------+------------------+
only showing top 5 rows



In [99]:
#using sql.functions
agg_columns_with_functions=[
                         F.count('*').alias('nb_transactions'),
                        F.countDistinct('date').alias('nb_days'),
                        (100*F.count('*')/F.countDistinct('date')).alias('ratio'),
                        F.min('CA').alias('min_CA'),
                        F.max('CA').alias('max_CA'),
                        F.sum('CA').alias('sum_CA'),
                        F.mean('CA').alias('mean_CA'),
                        F.stddev('CA').alias('stdv_CA'),
                        F.variance('CA').alias('variance_CA'),
                        F.skewness('CA').alias('skewness_CA'),
                        F.kurtosis('CA').alias('kurtosis_CA'),
                        F.first('CA').alias('first_CA'),
                        F.last('CA').alias('last_CA'),
    
]

In [100]:
stats_df=(df_trs.select('*',(F.col('quantity')*F.col('unit_price')).alias('CA'))
                .filter('CA>0')
                  .groupBy('magasin')
                  .agg(*agg_columns_with_functions)
          )

In [101]:
stats_df.first()

Row(magasin=29, nb_transactions=15, nb_days=15, ratio=100.0, min_CA=72, max_CA=1360, sum_CA=8214, mean_CA=547.6, stdv_CA=486.8742577474628, variance_CA=237046.54285714286, skewness_CA=0.6859056107514366, kurtosis_CA=-1.1731860174523876, first_CA=288, last_CA=72)

### c - jointures

In [102]:
######joins types :
#inner : only intersection keys
#left_outer : only keys in the left df
#right_outer : only keys in the right df
#outer : all keys in the right df and left df 

##############if the join key is not distinct in all rows of left df for ex,  
##############a left outer, or an outer join will generate for the same key at least two lines in the resulting df 


In [103]:
df_trs

DataFrame[transaction_id: bigint, date: string, magasin: bigint, quantity: bigint, unit_price: bigint]

In [104]:
df_magasins

DataFrame[magasin: bigint, pays: string, ville: string]

In [107]:
df_magasin_rows_added=df_magasins.unionAll(
    sc.parallelize([
            {'magasin': '9999', 'pays': 'fake', 'ville': 'fake'} , #  adding non exisiting magasin
            {'magasin': '2', 'pays': 'france', 'ville': 'rennes'}  # duplicating existing line
        ]).toDF()
)


In [106]:
df_trs.count()

468

In [108]:
df_trs.join(df_magasin_rows_added.distinct(),"magasin", 'inner').count()

468

In [110]:
df_trs.join(df_magasin_rows_added,"magasin").count()

480

In [111]:
df_trs.join(df_magasin_rows_added.distinct(),"magasin",'left_outer').count()

468

In [112]:
df_trs.join(df_magasin_rows_added.distinct(),"magasin",'right_outer').count()

469

In [113]:
df_trs.join(df_magasin_rows_added.distinct(),"magasin",'right_outer').filter('transaction_id is null').first()

Row(magasin=u'9999', transaction_id=None, date=None, quantity=None, unit_price=None, pays=u'fake', ville=u'fake')

In [114]:
# inner is the default type when not specified
df_trs.join(df_magasins,"magasin",'left_outer').count()

468

In [115]:
# inner is the default type when not specified
df_trs.join(
            df_magasin_rows_added.distinct().filter('magasin<>9999')
            ,"magasin",'outer'
            ).count()

468