# PySpark

In [1]:
import os
import pyspark
from pyspark import SparkContext
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
sc = SparkContext("local", "test")
sc

In [3]:
rdd = sc.parallelize(range(1, 11), 4)
rdd.take(100)


[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

### Transform

map

In [4]:
rdd = sc.parallelize(range(1, 11), 4)
rdd_map = rdd.map(lambda x: 2 * x)
print(rdd.collect())
print(rdd_map.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]


flatMap

In [5]:
rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"])
print(rdd2.collect())
print(rdd2.map(lambda x: x.split(" ")).collect())
print(rdd2.flatMap(lambda x: x.split(" ")).collect())

['hello SamShare', 'hello PySpark']
[['hello', 'SamShare'], ['hello', 'PySpark']]
['hello', 'SamShare', 'hello', 'PySpark']


filter

In [6]:
rdd = sc.parallelize(range(1, 11), 4)
print(rdd.collect())
print(rdd.filter(lambda x: x % 2 == 0).collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[2, 4, 6, 8, 10]


distinct

In [7]:
rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32])
print(rdd.collect())
print(rdd.distinct().collect())

[2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
[2, 4, 8, 16, 32]


reduceByKey

In [8]:
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.collect())
print(rdd.reduceByKey(add).collect())

[('a', 1), ('b', 1), ('a', 1)]
[('a', 2), ('b', 1)]


mapPartitions

In [9]:
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator):
    yield sum(iterator)
print(rdd.collect())
print(rdd.mapPartitions(f).collect())

[1, 2, 3, 4]
[3, 7]


sortBy

In [10]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())

[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]


subtract

In [11]:
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
print(sorted(x.subtract(y).collect()))

[('a', 1), ('b', 4), ('b', 5)]


union

In [12]:
rdd = sc.parallelize([1, 1, 2, 3])
print(rdd.union(rdd).collect())

[1, 1, 2, 3, 1, 1, 2, 3]


intersection

In [13]:
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5, 2, 3])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
print(rdd1.intersection(rdd2).collect())

[2, 1, 3]


cartesian

In [14]:
rdd = sc.parallelize([1, 2, 3])
print(sorted(rdd.cartesian(rdd).collect()))

[(1, 1), (1, 2), (1, 3), (2, 1), (2, 2), (2, 3), (3, 1), (3, 2), (3, 3)]


zip

In [15]:
x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
print(x.zip(y).collect())

[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]


zipWithIndex

In [16]:
rdd_name = sc.parallelize(["LiLei", "Hanmeimei", "Lily", "Lucy", "Ann", "Dachui", "RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())

[('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]


groupByKey

In [18]:
rdd = sc.parallelize([("a", 3), ("b", 1), ("a", 1)])
print(sorted(rdd.groupByKey().mapValues(len).collect()))
print(sorted(rdd.groupByKey().mapValues(list).collect()))
print(sorted(rdd.groupByKey().mapValues(sum).collect()))

[('a', 2), ('b', 1)]
[('a', [3, 1]), ('b', [1])]
[('a', 4), ('b', 1)]


sortByKey

In [23]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortByKey(False, 1).collect())

[('d', 4), ('b', 2), ('a', 1), ('2', 5), ('1', 3)]


join

In [25]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
print(sorted(x.join(y).collect()))
print(sorted(y.join(x).collect()))

[('a', (1, 2)), ('a', (1, 3))]
[('a', (2, 1)), ('a', (3, 1))]


leftOuterJoin/rightOuterJoin

In [27]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
print(sorted(x.leftOuterJoin(y).collect()))
print(sorted(x.rightOuterJoin(y).collect()))

[('a', (1, 2)), ('b', (4, None))]
[('a', (1, 2))]


### Action

collect

In [28]:
rdd = sc.parallelize(range(0, 5))
rdd_collect = rdd.collect()
print(rdd_collect)

[0, 1, 2, 3, 4]


first

In [30]:
print(sc.parallelize([2, 3, 4]).first())

2


collectAsMap

In [31]:
print(sc.parallelize([(1, 2), (3, 4)]).collectAsMap())

{1: 2, 3: 4}


reduce

In [37]:
rdd = sc.parallelize(range(10), 5)
print(rdd.reduce(lambda x, y: x+y))
print(rdd.reduce(add))

45
45


countByKey/countByValue

In [38]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(sorted(rdd.countByKey().items()))
print(sorted(rdd.countByValue().items()))

[('a', 2), ('b', 1)]
[(('a', 1), 2), (('b', 1), 1)]


take

In [39]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.take(5))

[('a', 1), ('b', 1), ('a', 1)]


takeSample

In [43]:
rdd = sc.parallelize(range(100), 4)
print(rdd.takeSample(True, 5, 42))

[59, 55, 27, 70, 98]


foreach

In [44]:
rdd = sc.parallelize(range(10), 5)
accum = sc.accumulator(0)
rdd.foreach(lambda x: accum.add(x))
print(accum.value)

45


## SparkSQL

In [47]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testsql').getOrCreate()

sc = spark.sparkContext

### SparkDataFrame

In [48]:
rdd = sc.parallelize([("Sam", 28, 88), ("Flora", 28, 90), ("Run", 1, 60)])
df = rdd.toDF(["name", "age", "score"])
df.show()
df.printSchema()

+-----+---+-----+
| name|age|score|
+-----+---+-----+
|  Sam| 28|   88|
|Flora| 28|   90|
|  Run|  1|   60|
+-----+---+-----+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- score: long (nullable = true)



In [51]:
import pandas as pd
df = pd.DataFrame([['Sam', 28, 88], ['Flora', 28, 90], ['Run', 1, 60]],
                  columns=['name', 'age', 'score'])
print(df)
Spark_df = spark.createDataFrame(df)
Spark_df.show()

    name  age  score
0    Sam   28     88
1  Flora   28     90
2    Run    1     60
+-----+---+-----+
| name|age|score|
+-----+---+-----+
|  Sam| 28|   88|
|Flora| 28|   90|
|  Run|  1|   60|
+-----+---+-----+



In [52]:
list_values = [['Sam', 28, 88], ['Flora', 28, 90], ['Run', 1, 60]]
Spark_df = spark.createDataFrame(list_values, ['name', 'age', 'score'])
Spark_df.show()

+-----+---+-----+
| name|age|score|
+-----+---+-----+
|  Sam| 28|   88|
|Flora| 28|   90|
|  Run|  1|   60|
+-----+---+-----+



In [54]:
df.to_csv('test_csv.csv')

df = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter", ",").csv("test_csv.csv")
df.show(2)
df.printSchema()

+---+-----+---+-----+
|_c0| name|age|score|
+---+-----+---+-----+
|  0|  Sam| 28|   88|
|  1|Flora| 28|   90|
+---+-----+---+-----+
only showing top 2 rows

root
 |-- _c0: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- score: integer (nullable = true)



### SparkDataFrame API

In [2]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("testsql").config(
    "master", "local").enableHiveSupport().getOrCreate()
sc = spark.sparkContext

In [3]:
rdd = sc.parallelize([("Sam", 28, 88, "M"),
                      ("Flora", 28, 90, "F"),
                      ("Run", 1, 60, None),
                      ("Peter", 55, 100, "M"),
                      ("Mei", 54, 95, "F")])
df = rdd.toDF(["name", "age", "score", "sex"])
df.show()
df.printSchema()

+-----+---+-----+----+
| name|age|score| sex|
+-----+---+-----+----+
|  Sam| 28|   88|   M|
|Flora| 28|   90|   F|
|  Run|  1|   60|null|
|Peter| 55|  100|   M|
|  Mei| 54|   95|   F|
+-----+---+-----+----+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- score: long (nullable = true)
 |-- sex: string (nullable = true)



view

In [5]:
df.collect()

[Row(name='Sam', age=28, score=88, sex='M'),
 Row(name='Flora', age=28, score=90, sex='F'),
 Row(name='Run', age=1, score=60, sex=None),
 Row(name='Peter', age=55, score=100, sex='M'),
 Row(name='Mei', age=54, score=95, sex='F')]

In [6]:
df.count()

5

In [7]:
df.columns

['name', 'age', 'score', 'sex']

In [8]:
df.dtypes

[('name', 'string'), ('age', 'bigint'), ('score', 'bigint'), ('sex', 'string')]

In [10]:
df.describe(['age']).show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 5|
|   mean|              33.2|
| stddev|22.353970564532826|
|    min|                 1|
|    max|                55|
+-------+------------------+



In [11]:
df.describe().show()

+-------+-----+------------------+------------------+----+
|summary| name|               age|             score| sex|
+-------+-----+------------------+------------------+----+
|  count|    5|                 5|                 5|   4|
|   mean| null|              33.2|              86.6|null|
| stddev| null|22.353970564532826|15.582040944625966|null|
|    min|Flora|                 1|                60|   F|
|    max|  Sam|                55|               100|   M|
+-------+-----+------------------+------------------+----+



In [12]:
df.select("sex", "score").show()

+----+-----+
| sex|score|
+----+-----+
|   M|   88|
|   F|   90|
|null|   60|
|   M|  100|
|   F|   95|
+----+-----+



In [15]:
df.first()

Row(name='Sam', age=28, score=88, sex='M')

In [16]:
df.freqItems(["age", "sex"]).show()

+---------------+-------------+
|  age_freqItems|sex_freqItems|
+---------------+-------------+
|[55, 28, 1, 54]| [M, F, null]|
+---------------+-------------+



In [17]:
df.summary().show()

+-------+-----+------------------+------------------+----+
|summary| name|               age|             score| sex|
+-------+-----+------------------+------------------+----+
|  count|    5|                 5|                 5|   4|
|   mean| null|              33.2|              86.6|null|
| stddev| null|22.353970564532826|15.582040944625966|null|
|    min|Flora|                 1|                60|   F|
|    25%| null|                28|                88|null|
|    50%| null|                28|                90|null|
|    75%| null|                54|                95|null|
|    max|  Sam|                55|               100|   M|
+-------+-----+------------------+------------------+----+



In [18]:
df.sample(0.8).show()

+-----+---+-----+----+
| name|age|score| sex|
+-----+---+-----+----+
|Flora| 28|   90|   F|
|  Run|  1|   60|null|
|Peter| 55|  100|   M|
+-----+---+-----+----+



process

In [19]:
df.distinct().show()

+-----+---+-----+----+
| name|age|score| sex|
+-----+---+-----+----+
|  Sam| 28|   88|   M|
|Flora| 28|   90|   F|
|  Run|  1|   60|null|
|Peter| 55|  100|   M|
|  Mei| 54|   95|   F|
+-----+---+-----+----+



In [20]:
df.dropDuplicates(["sex"]).show()

+-----+---+-----+----+
| name|age|score| sex|
+-----+---+-----+----+
|  Run|  1|   60|null|
|Flora| 28|   90|   F|
|  Sam| 28|   88|   M|
+-----+---+-----+----+



In [21]:
df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b",  3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["C1", "C2"])
df3 = df1.exceptAll(df2)
df4 = df1.subtract(df2)

df3.show()
df4.show()

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  c|  4|
+---+---+

+---+---+
| C1| C2|
+---+---+
|  c|  4|
+---+---+



In [22]:
df1.intersectAll(df2).show()

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  b|  3|
+---+---+



In [23]:
df.drop('age').show()

+-----+-----+----+
| name|score| sex|
+-----+-----+----+
|  Sam|   88|   M|
|Flora|   90|   F|
|  Run|   60|null|
|Peter|  100|   M|
|  Mei|   95|   F|
+-----+-----+----+



In [24]:
df.withColumn('birth_year', 2022 - df.age).show()

+-----+---+-----+----+----------+
| name|age|score| sex|birth_year|
+-----+---+-----+----+----------+
|  Sam| 28|   88|   M|      1994|
|Flora| 28|   90|   F|      1994|
|  Run|  1|   60|null|      2021|
|Peter| 55|  100|   M|      1967|
|  Mei| 54|   95|   F|      1968|
+-----+---+-----+----+----------+



In [26]:
df.withColumnRenamed('sex', 'gender').show()

+-----+---+-----+------+
| name|age|score|gender|
+-----+---+-----+------+
|  Sam| 28|   88|     M|
|Flora| 28|   90|     F|
|  Run|  1|   60|  null|
|Peter| 55|  100|     M|
|  Mei| 54|   95|     F|
+-----+---+-----+------+



In [27]:
df.dropna(how='all', subset=['sex']).show()

+-----+---+-----+---+
| name|age|score|sex|
+-----+---+-----+---+
|  Sam| 28|   88|  M|
|Flora| 28|   90|  F|
|Peter| 55|  100|  M|
|  Mei| 54|   95|  F|
+-----+---+-----+---+



In [28]:
df.fillna({'sex': 'NB'}).show()

+-----+---+-----+---+
| name|age|score|sex|
+-----+---+-----+---+
|  Sam| 28|   88|  M|
|Flora| 28|   90|  F|
|  Run|  1|   60| NB|
|Peter| 55|  100|  M|
|  Mei| 54|   95|  F|
+-----+---+-----+---+



In [30]:
df.filter(df.age > 50).filter(df.sex == 'M').show()

+-----+---+-----+---+
| name|age|score|sex|
+-----+---+-----+---+
|Peter| 55|  100|  M|
+-----+---+-----+---+



In [32]:
df1 = spark.createDataFrame([("a", 1), ("d", 1), ("b",  3), ("c", 4)], ["id", "num1"])
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["id", "num2"])

df1.join(df2, df1.id == df2.id, 'left').select(df1.id.alias("df1_id"),
                                               df1.num1.alias("df1_num"),
                                               df2.num2.alias("df2_num")
                                               ).sort(["df1_id"], ascending=False).show()

+------+-------+-------+
|df1_id|df1_num|df2_num|
+------+-------+-------+
|     d|      1|   null|
|     c|      4|   null|
|     b|      3|      3|
|     a|      1|      1|
+------+-------+-------+



In [33]:
df.groupBy('sex').agg(F.min(df.age).alias('min_age'), 
                      F.expr('avg(age)').alias('avg_age'),
                      F.expr('collect_list(name)').alias('name_list')).show()

+----+-------+-------+------------+
| sex|min_age|avg_age|   name_list|
+----+-------+-------+------------+
|   M|     28|   41.5|[Sam, Peter]|
|   F|     28|   41.0|[Flora, Mei]|
|null|      1|    1.0|       [Run]|
+----+-------+-------+------------+



In [37]:
def f(person):
    print(person.name)
    
df.foreach(f)

In [39]:
df.na.replace({"M": "Male", "F": "Female"}).show()

+-----+---+-----+------+
| name|age|score|   sex|
+-----+---+-----+------+
|  Sam| 28|   88|  Male|
|Flora| 28|   90|Female|
|  Run|  1|   60|  null|
|Peter| 55|  100|  Male|
|  Mei| 54|   95|Female|
+-----+---+-----+------+



In [40]:
df1 = spark.createDataFrame([("a", 1), ("d", 1), ("b",  3), ("c", 4)], ["id", "num"])
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["id", "num"])
df1.union(df2).show()
df1.unionAll(df2).show()

+---+---+
| id|num|
+---+---+
|  a|  1|
|  d|  1|
|  b|  3|
|  c|  4|
|  a|  1|
|  b|  3|
+---+---+

+---+---+
| id|num|
+---+---+
|  a|  1|
|  d|  1|
|  b|  3|
|  c|  4|
|  a|  1|
|  b|  3|
+---+---+



In [43]:
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+



column operations

In [67]:
df.filter(df.sex.endswith('M')).show()

+-----+---+-----+---+
| name|age|score|sex|
+-----+---+-----+---+
|  Sam| 28|   88|  M|
|Peter| 55|  100|  M|
+-----+---+-----+---+



In [79]:
df.filter(df.sex.isNull()).show()

+----+---+-----+----+
|name|age|score| sex|
+----+---+-----+----+
| Run|  1|   60|null|
+----+---+-----+----+



In [None]:
df[df.name.isin("Peter", "Mike")].collect()

In [None]:
df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()

transform

In [80]:
df.createOrReplaceGlobalTempView("people")

In [81]:
spark.sql("select * from global_temp.people where sex = 'M' ").show()

+-----+---+-----+---+
| name|age|score|sex|
+-----+---+-----+---+
|  Sam| 28|   88|  M|
|Peter| 55|  100|  M|
+-----+---+-----+---+



In [82]:
df1 = df.select("name", "sex")
df2 = df.select("name", "sex")
df3 = df1.crossJoin(df2)

print(df1.count(), df2.count(), df3.count())

5 5 25


In [83]:
df.toPandas()

Unnamed: 0,name,age,score,sex
0,Sam,28,88,M
1,Flora,28,90,F
2,Run,1,60,
3,Peter,55,100,M
4,Mei,54,95,F


In [84]:
df.rdd

MapPartitionsRDD[172] at javaToPython at NativeMethodAccessorImpl.java:0

statistics

In [85]:
df.cov("age", "score")

324.6

In [86]:
df.corr("age", "score", method="pearson")

0.9319004030498816

In [90]:
df.cube("name", "sex").count().show()

+-----+----+-----+
| name| sex|count|
+-----+----+-----+
|  Sam|   M|    1|
|  Sam|null|    1|
| null|null|    5|
| null|   M|    2|
| null|   F|    2|
|Flora|   F|    1|
|Flora|null|    1|
|  Run|null|    1|
|  Run|null|    1|
| null|null|    1|
|Peter|   M|    1|
|Peter|null|    1|
|  Mei|   F|    1|
|  Mei|null|    1|
+-----+----+-----+

