## Dataframe manipulation

In [1]:
from pyspark import SparkContext
sc = SparkContext()
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
from pyspark.sql import SparkSession
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

### create DataFrame

In [2]:
# create from RDD
some_rdd = sc.parallelize([Row(name = 'John', age = 19),
                          Row(name = 'Bob', age = 21)])
some_df = sqlContext.createDataFrame(some_rdd)

In [3]:
# define schema
schema = StructType([StructField('personal_name', StringType(), False),
                    StructField('age', IntegerType(), False)])
another_df = sqlContext.createDataFrame(some_rdd, schema)
# another_df.printSchema()

In [4]:
# read from other type of files
# read from pandas
import pandas as pd
pd_df = sqlContext.createDataFrame(pd.DataFrame(range(5)))
# or 
pd_df = spark.createDataFrame(pd.DataFrame(range(5)))
# pd_df.printSchema()

In [5]:
# read from json
import json
data = [ { 'a' : 1, 'b' : 2, 'c' : 3, 'd' : 4, 'e' : 5 } ]
with open('test_data.json', 'w') as add:
    json.dump(data, add)
json_df = sqlContext.read.json('test_data.json')
# or 
json_df = spark.read.json('test_data.json')
# json_df.printSchema()

In [6]:
# read from csv
csv_df = spark.read.csv('titanic.csv', header = True)
# csv_df.printSchema()

In [7]:
# read from parquet
parquet_df = spark.read.load('part-00000-82db5734-5b30-4b12-8c75-3d0197e2f1b0-c000.snappy.parquet')
# parquet_df.printSchema()

In [8]:
# write to parquet
# csv_df.write.parquet("titanic.parquet")

### check DataFrame

In [9]:
# check some row
csv_df.show(5)

+-----------+---+-------+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
|Passengerid|Age|   Fare|Sex|sibsp|zero5|zero6|zero7|zero8|zero9|zero10|zero11|Parch|zero13|zero14|zero15|zero16|zero17|zero18|zero19|zero20|Pclass|zero22|zero23|Embarked|zero25|zero26|2urvived|
+-----------+---+-------+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
|          1| 22|   7.25|  0|    1|    0|    0|    0|    0|    0|     0|     0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     3|     0|     0|       2|     0|     0|       0|
|          2| 38|71.2833|  1|    1|    0|    0|    0|    0|    0|     0|     0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     1|     0|     0|       0|     0|     0|       1|
|          3| 26|  7.925|

In [10]:
# print schema
csv_df.printSchema()

root
 |-- Passengerid: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- sibsp: string (nullable = true)
 |-- zero5: string (nullable = true)
 |-- zero6: string (nullable = true)
 |-- zero7: string (nullable = true)
 |-- zero8: string (nullable = true)
 |-- zero9: string (nullable = true)
 |-- zero10: string (nullable = true)
 |-- zero11: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- zero13: string (nullable = true)
 |-- zero14: string (nullable = true)
 |-- zero15: string (nullable = true)
 |-- zero16: string (nullable = true)
 |-- zero17: string (nullable = true)
 |-- zero18: string (nullable = true)
 |-- zero19: string (nullable = true)
 |-- zero20: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- zero22: string (nullable = true)
 |-- zero23: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- zero25: string (nullable = true)
 |-- zero26: st

In [11]:
# take
aaa = csv_df.take(5)
type(aaa)

list

In [37]:
# to rdd
df_rdd = csv_df.rdd

In [38]:
# describe 和 summary 使用区别
csv_df.describe('Sex').show()
csv_df.summary('mean').show()

+-------+------------------+
|summary|               Sex|
+-------+------------------+
|  count|              1309|
|   mean|0.3559969442322384|
| stddev|0.4789972834413279|
|    min|                 0|
|    max|                 1|
+-------+------------------+

+-------+-----------+-----------------+-----------------+------------------+------------------+-----+-----+-----+-----+-----+------+------+------------------+------+------+------+------+------+------+------+------+-----------------+------+------+------------------+------+------+------------------+
|summary|Passengerid|              Age|             Fare|               Sex|             sibsp|zero5|zero6|zero7|zero8|zero9|zero10|zero11|             Parch|zero13|zero14|zero15|zero16|zero17|zero18|zero19|zero20|           Pclass|zero22|zero23|          Embarked|zero25|zero26|          2urvived|
+-------+-----------+-----------------+-----------------+------------------+------------------+-----+-----+-----+-----+-----+------+------+-

In [39]:
# count
csv_df.count()

1309

In [40]:
# is null
from pyspark.sql.functions import isnull
csv_df.filter(isnull("Age"))

DataFrame[Passengerid: string, Age: string, Fare: string, Sex: string, sibsp: string, zero5: string, zero6: string, zero7: string, zero8: string, zero9: string, zero10: string, zero11: string, Parch: string, zero13: string, zero14: string, zero15: string, zero16: string, zero17: string, zero18: string, zero19: string, zero20: string, Pclass: string, zero22: string, zero23: string, Embarked: string, zero25: string, zero26: string, 2urvived: string]

In [41]:
# describe
csv_df.select('sex').describe().show()

+-------+------------------+
|summary|               sex|
+-------+------------------+
|  count|              1309|
|   mean|0.3559969442322384|
| stddev|0.4789972834413279|
|    min|                 0|
|    max|                 1|
+-------+------------------+



In [42]:
# distinct
csv_df.select('sex').distinct().count()

2

In [43]:
# sample
csv_df.sample(.5).count()

638

In [44]:
# check columns
type(csv_df.columns)

list

### select

In [45]:
df = csv_df

In [46]:
# select columns
# direct (class cloumn)
df.Passengerid 
# by Colname (class dataframe)
df.select('Passengerid')
df.select(df['Passengerid'], df['Sex']+1)
df.select(df.Passengerid, df.Sex)

DataFrame[Passengerid: string, Sex: string]

In [47]:
# select row with some restrictions
df.where("Passengerid == '1' and Sex = '1'")

DataFrame[Passengerid: string, Age: string, Fare: string, Sex: string, sibsp: string, zero5: string, zero6: string, zero7: string, zero8: string, zero9: string, zero10: string, zero11: string, Parch: string, zero13: string, zero14: string, zero15: string, zero16: string, zero17: string, zero18: string, zero19: string, zero20: string, Pclass: string, zero22: string, zero23: string, Embarked: string, zero25: string, zero26: string, 2urvived: string]

### sort

In [48]:
df.orderBy("Sex")
df.orderBy(df.Sex.asc())

DataFrame[Passengerid: string, Age: string, Fare: string, Sex: string, sibsp: string, zero5: string, zero6: string, zero7: string, zero8: string, zero9: string, zero10: string, zero11: string, Parch: string, zero13: string, zero14: string, zero15: string, zero16: string, zero17: string, zero18: string, zero19: string, zero20: string, Pclass: string, zero22: string, zero23: string, Embarked: string, zero25: string, zero26: string, 2urvived: string]

### add and change column

In [49]:
# 注意withcol不能直接加上list，只能通过列操作
df.withColumn("Sex2", 2 + df.Sex).show(1)

+-----------+---+----+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+----+
|Passengerid|Age|Fare|Sex|sibsp|zero5|zero6|zero7|zero8|zero9|zero10|zero11|Parch|zero13|zero14|zero15|zero16|zero17|zero18|zero19|zero20|Pclass|zero22|zero23|Embarked|zero25|zero26|2urvived|Sex2|
+-----------+---+----+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+----+
|          1| 22|7.25|  0|    1|    0|    0|    0|    0|    0|     0|     0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     3|     0|     0|       2|     0|     0|       0| 2.0|
+-----------+---+----+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+----+
only showing to

In [50]:
# change datatype
df.withColumn("Sex", df.Sex.cast(IntegerType()))

DataFrame[Passengerid: string, Age: string, Fare: string, Sex: int, sibsp: string, zero5: string, zero6: string, zero7: string, zero8: string, zero9: string, zero10: string, zero11: string, Parch: string, zero13: string, zero14: string, zero15: string, zero16: string, zero17: string, zero18: string, zero19: string, zero20: string, Pclass: string, zero22: string, zero23: string, Embarked: string, zero25: string, zero26: string, 2urvived: string]

In [51]:
# Change column number
df.withColumn("Sex", df.sibsp).show(1)

+-----------+---+----+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
|Passengerid|Age|Fare|Sex|sibsp|zero5|zero6|zero7|zero8|zero9|zero10|zero11|Parch|zero13|zero14|zero15|zero16|zero17|zero18|zero19|zero20|Pclass|zero22|zero23|Embarked|zero25|zero26|2urvived|
+-----------+---+----+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
|          1| 22|7.25|  1|    1|    0|    0|    0|    0|    0|     0|     0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     3|     0|     0|       2|     0|     0|       0|
+-----------+---+----+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
only showing top 1 row



In [52]:
# Rename
df.withColumnRenamed( "Sex" , "SSex").show(1)

+-----------+---+----+----+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
|Passengerid|Age|Fare|SSex|sibsp|zero5|zero6|zero7|zero8|zero9|zero10|zero11|Parch|zero13|zero14|zero15|zero16|zero17|zero18|zero19|zero20|Pclass|zero22|zero23|Embarked|zero25|zero26|2urvived|
+-----------+---+----+----+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
|          1| 22|7.25|   0|    1|    0|    0|    0|    0|    0|     0|     0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     3|     0|     0|       2|     0|     0|       0|
+-----------+---+----+----+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
only showing top 1 row



### join union

In [53]:
df1 = spark.createDataFrame(
        [("a", 1), ("a", 1), ("a", 1), ("a", 2), ("b",  3), ("c", 4)], ["C1", "C2"])
df1.show(5)

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  a|  1|
|  a|  2|
|  b|  3|
+---+---+
only showing top 5 rows



In [54]:
df.join(df1, df.Sex == df1.C2, 'outer').show(1)

+-----------+---+----+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+----+----+
|Passengerid|Age|Fare|Sex|sibsp|zero5|zero6|zero7|zero8|zero9|zero10|zero11|Parch|zero13|zero14|zero15|zero16|zero17|zero18|zero19|zero20|Pclass|zero22|zero23|Embarked|zero25|zero26|2urvived|  C1|  C2|
+-----------+---+----+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+----+----+
|          1| 22|7.25|  0|    1|    0|    0|    0|    0|    0|     0|     0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     3|     0|     0|       2|     0|     0|       0|null|null|
+-----------+---+----+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+-

In [55]:
# union (union rows and through column number!)
csv_df.select('Sex', 'Passengerid').union(csv_df.select('Fare', 'Age')).count() # double the number

2618

In [56]:
# substract
df.select("Sex").subtract(df.select("Fare")).show()

+---+
|Sex|
+---+
|  1|
+---+



In [57]:
# intersect
df.select("Sex").intersect(df.select("Sex")).show(5)

+---+
|Sex|
+---+
|  0|
|  1|
+---+



### statistic

In [58]:
# freqitem
df.freqItems(["Sex"] , 0.5).show() #如下所见，容易出现错误，算法导致的False Positive
df.groupBy("Sex").count().show()

+-------------+
|Sex_freqItems|
+-------------+
|       [1, 0]|
+-------------+

+---+-----+
|Sex|count|
+---+-----+
|  0|  843|
|  1|  466|
+---+-----+



In [59]:
df.dtypes

[('Passengerid', 'string'),
 ('Age', 'string'),
 ('Fare', 'string'),
 ('Sex', 'string'),
 ('sibsp', 'string'),
 ('zero5', 'string'),
 ('zero6', 'string'),
 ('zero7', 'string'),
 ('zero8', 'string'),
 ('zero9', 'string'),
 ('zero10', 'string'),
 ('zero11', 'string'),
 ('Parch', 'string'),
 ('zero13', 'string'),
 ('zero14', 'string'),
 ('zero15', 'string'),
 ('zero16', 'string'),
 ('zero17', 'string'),
 ('zero18', 'string'),
 ('zero19', 'string'),
 ('zero20', 'string'),
 ('Pclass', 'string'),
 ('zero22', 'string'),
 ('zero23', 'string'),
 ('Embarked', 'string'),
 ('zero25', 'string'),
 ('zero26', 'string'),
 ('2urvived', 'string')]

In [60]:
# df.approxQuantile('Sex',[0.5], 0.25) #失败，因为是string 
df = df.withColumn("Sex", df.Sex.cast(IntegerType())) # 转换为int
df.approxQuantile('Sex',[0.6], 0.01)

[0.0]

In [62]:
import pyspark.sql.functions as F
df.agg(*[(1 - (F.count(c)/F.count('*'))).alias(c+"_missing")
    for c in df.columns]
    ).show()

+-------------------+-----------+------------+-----------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------+--------------+----------------+
|Passengerid_missing|Age_missing|Fare_missing|Sex_missing|sibsp_missing|zero5_missing|zero6_missing|zero7_missing|zero8_missing|zero9_missing|zero10_missing|zero11_missing|Parch_missing|zero13_missing|zero14_missing|zero15_missing|zero16_missing|zero17_missing|zero18_missing|zero19_missing|zero20_missing|Pclass_missing|zero22_missing|zero23_missing|    Embarked_missing|zero25_missing|zero26_missing|2urvived_missing|
+-------------------+-----------+------------+-----------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+---

In [63]:
# groupBy use cols or function
df.groupBy('Fare').agg({'Sex': 'mean', 'Age': 'mean'}).show(5)
df.groupBy('Fare').agg({'Sex': 'mean', 'Sex': 'min'}).show(5) # when you apply two function into one column will reslt in problem
# use following instead
import pyspark.sql.functions as F
df.groupBy('Fare').agg(F.mean(df.Sex), F.min(df.Sex)).show(5)
# 这么写也行
df.groupBy('Fare').agg(F.mean(df.Sex)- F.min(df.Sex)).show(5)
# 想对所有进行agg
exprs = {x: "sum" for x in ['Sex', 'Age', 'Fare']}
df.select('Sex', 'Age', 'Fare').groupBy("Fare").agg(exprs).show(5)



+--------+------------------+--------+
|    Fare|          avg(Sex)|avg(Age)|
+--------+------------------+--------+
|   7.125|               0.0|   28.75|
|   26.25|0.6666666666666666|    27.5|
|110.8833|               0.5|   33.25|
|    69.3|               1.0|    24.0|
|   12.35|               0.4|    42.2|
+--------+------------------+--------+
only showing top 5 rows

+--------+--------+
|    Fare|min(Sex)|
+--------+--------+
|   7.125|       0|
|   26.25|       0|
|110.8833|       0|
|    69.3|       1|
|   12.35|       0|
+--------+--------+
only showing top 5 rows

+--------+------------------+--------+
|    Fare|          avg(Sex)|min(Sex)|
+--------+------------------+--------+
|   7.125|               0.0|       0|
|   26.25|0.6666666666666666|       0|
|110.8833|               0.5|       0|
|    69.3|               1.0|       1|
|   12.35|               0.4|       0|
+--------+------------------+--------+
only showing top 5 rows

+--------+---------------------+
|    Fare|

In [64]:
# 另UDF
from pyspark.sql.functions import udf
m_fm = udf(lambda Sex: 'Male' if Sex == 1 else "Female", StringType())
df.withColumn("Sex", m_fm(df.Sex)).show(5)

+-----------+---+-------+------+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
|Passengerid|Age|   Fare|   Sex|sibsp|zero5|zero6|zero7|zero8|zero9|zero10|zero11|Parch|zero13|zero14|zero15|zero16|zero17|zero18|zero19|zero20|Pclass|zero22|zero23|Embarked|zero25|zero26|2urvived|
+-----------+---+-------+------+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
|          1| 22|   7.25|Female|    1|    0|    0|    0|    0|    0|     0|     0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     3|     0|     0|       2|     0|     0|       0|
|          2| 38|71.2833|  Male|    1|    0|    0|    0|    0|    0|     0|     0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     1|     0|     0|       0|     0|     0|       1|
|         

In [65]:
df.foreach(lambda row: row.Sex)

### mapreduce

In [66]:
# map
df.select('Sex').rdd.map(lambda x:(x,1)).take(5) # 使用情景，当你想对每行单独进行操作时

[(Row(Sex=0), 1),
 (Row(Sex=1), 1),
 (Row(Sex=1), 1),
 (Row(Sex=1), 1),
 (Row(Sex=0), 1)]

### drop

In [67]:
# drop column
df.drop('Sex')

DataFrame[Passengerid: string, Age: string, Fare: string, sibsp: string, zero5: string, zero6: string, zero7: string, zero8: string, zero9: string, zero10: string, zero11: string, Parch: string, zero13: string, zero14: string, zero15: string, zero16: string, zero17: string, zero18: string, zero19: string, zero20: string, Pclass: string, zero22: string, zero23: string, Embarked: string, zero25: string, zero26: string, 2urvived: string]

In [68]:
# dropna 针对row
df.dropna(subset=['Sex', 'Fare'])

DataFrame[Passengerid: string, Age: string, Fare: string, Sex: int, sibsp: string, zero5: string, zero6: string, zero7: string, zero8: string, zero9: string, zero10: string, zero11: string, Parch: string, zero13: string, zero14: string, zero15: string, zero16: string, zero17: string, zero18: string, zero19: string, zero20: string, Pclass: string, zero22: string, zero23: string, Embarked: string, zero25: string, zero26: string, 2urvived: string]

### distinct

In [69]:
df.select('Sex', 'Fare').distinct().show()

+---+-------+
|Sex|   Fare|
+---+-------+
|  0|    263|
|  0|18.7875|
|  1| 8.6833|
|  0|   7.25|
|  1| 7.7333|
|  0|     39|
|  1|   69.3|
|  1|     12|
|  0|14.4542|
|  0| 8.4333|
|  0| 7.8958|
|  0|     14|
|  1|27.7208|
|  0|13.8625|
|  1|     57|
|  0|   6.45|
|  1|   53.1|
|  1|     55|
|  1|  12.35|
|  1|13.8583|
+---+-------+
only showing top 20 rows



In [70]:
df.dropDuplicates(subset = [c for c in df.columns if c != 'Passengerid']).show()

+-----------+---+--------+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
|Passengerid|Age|    Fare|Sex|sibsp|zero5|zero6|zero7|zero8|zero9|zero10|zero11|Parch|zero13|zero14|zero15|zero16|zero17|zero18|zero19|zero20|Pclass|zero22|zero23|Embarked|zero25|zero26|2urvived|
+-----------+---+--------+---+-----+-----+-----+-----+-----+-----+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+--------+------+------+--------+
|        725| 27|    53.1|  0|    1|    0|    0|    0|    0|    0|     0|     0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     1|     0|     0|       2|     0|     0|       1|
|       1033| 33|  151.55|  1|    0|    0|    0|    0|    0|    0|     0|     0|    0|     0|     0|     0|     0|     0|     0|     0|     0|     1|     0|     0|       2|     0|     0|       0|
|        873| 33|   