In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession\
    .builder\
    .appName("spark_practice")\
    .config("spark.driver.extraClassPath", r"/home/data-taipei/pythonHarry/mysql-connector-j-8.2.0/mysql-connector-j-8.2.0.jar")\
    .getOrCreate()

24/03/19 14:35:06 WARN Utils: Your hostname, data-taipei resolves to a loopback address: 127.0.1.1; using 192.168.1.43 instead (on interface ens18)
24/03/19 14:35:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/19 14:35:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# read csv

In [3]:
import pandas as pd

pd.read_csv('test.csv')

Unnamed: 0,name,age
0,Harry,25
1,Steven,28
2,David,27


In [4]:
spark

In [28]:
# inferSchema=True for numeric type. Otherwise the default is all string
df_pyspark = spark.read.option('header', 'true').csv('test.csv', inferSchema=True)
df_pyspark

DataFrame[name: string, age: double, experience: double]

In [29]:
df_pyspark.show()

+------+----+----------+
|  name| age|experience|
+------+----+----------+
| Harry|25.0|       5.0|
|Steven|28.0|      10.0|
| David|27.0|       7.0|
+------+----+----------+



In [30]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [31]:
df_pyspark.head(3)

[Row(name='Harry', age=25.0, experience=5.0),
 Row(name='Steven', age=28.0, experience=10.0),
 Row(name='David', age=27.0, experience=7.0)]

In [32]:
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- experience: double (nullable = true)



In [33]:
# another way for header parameter
spark.read.csv('test.csv', inferSchema=True, header=True).printSchema()

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- experience: double (nullable = true)



In [34]:
# get column names
df_pyspark.columns

['name', 'age', 'experience']

In [35]:
# select only name column
df_pyspark.select('name').show()

+------+
|  name|
+------+
| Harry|
|Steven|
| David|
+------+



In [37]:
df_pyspark.select(['name', 'experience']).show()

+------+----------+
|  name|experience|
+------+----------+
| Harry|       5.0|
|Steven|      10.0|
| David|       7.0|
+------+----------+



In [39]:
df_pyspark.describe().show()

+-------+------+------------------+-----------------+
|summary|  name|               age|       experience|
+-------+------+------------------+-----------------+
|  count|     3|                 3|                3|
|   mean|  NULL|26.666666666666668|7.333333333333333|
| stddev|  NULL|1.5275252316519468|2.516611478423583|
|    min| David|              25.0|              5.0|
|    max|Steven|              28.0|             10.0|
+-------+------+------------------+-----------------+



# add column

In [42]:
df_pyspark = df_pyspark.withColumn('experience_after_2_years', df_pyspark['experience'] + 2)

In [44]:
df_pyspark.show()

+------+----+----------+------------------------+
|  name| age|experience|experience_after_2_years|
+------+----+----------+------------------------+
| Harry|25.0|       5.0|                     7.0|
|Steven|28.0|      10.0|                    12.0|
| David|27.0|       7.0|                     9.0|
+------+----+----------+------------------------+



# drop column

In [48]:
df_pyspark = df_pyspark.drop('experience_after_2_years')

In [49]:
df_pyspark.show()

+------+----+----------+
|  name| age|experience|
+------+----+----------+
| Harry|25.0|       5.0|
|Steven|28.0|      10.0|
| David|27.0|       7.0|
+------+----+----------+



# rename column

In [50]:
df_pyspark.withColumnRenamed('name', 'first_name').show()

+----------+----+----------+
|first_name| age|experience|
+----------+----+----------+
|     Harry|25.0|       5.0|
|    Steven|28.0|      10.0|
|     David|27.0|       7.0|
+----------+----+----------+



# handle missing values

In [83]:
df_pyspark = spark.read.csv('test.csv', header=True, inferSchema=True)
df_pyspark.show()

+-------+----+----------+-------+
|   name| age|experience| salary|
+-------+----+----------+-------+
|  Harry|25.0|       5.0|50000.0|
| Steven|28.0|      10.0|70000.0|
|  David|27.0|       7.0|60000.0|
| Andrew|NULL|       4.0|60000.0|
|Charlie|24.0|      NULL|   NULL|
+-------+----+----------+-------+



In [56]:
# drop whole row if has any null
df_pyspark.na.drop().show()

+------+----+----------+-------+
|  name| age|experience| salary|
+------+----+----------+-------+
| Harry|25.0|       5.0|50000.0|
|Steven|28.0|      10.0|70000.0|
| David|27.0|       7.0|60000.0|
+------+----+----------+-------+



In [58]:
df_pyspark.na.drop(how='all').show()

+-------+----+----------+-------+
|   name| age|experience| salary|
+-------+----+----------+-------+
|  Harry|25.0|       5.0|50000.0|
| Steven|28.0|      10.0|70000.0|
|  David|27.0|       7.0|60000.0|
| Andrew|NULL|       4.0|60000.0|
|Charlie|24.0|      NULL|   NULL|
+-------+----+----------+-------+



In [63]:
# threshold
df_pyspark.na.drop(how='any', thresh=3).show()

+------+----+----------+-------+
|  name| age|experience| salary|
+------+----+----------+-------+
| Harry|25.0|       5.0|50000.0|
|Steven|28.0|      10.0|70000.0|
| David|27.0|       7.0|60000.0|
|Andrew|NULL|       4.0|60000.0|
+------+----+----------+-------+



In [64]:
# subset
df_pyspark.na.drop(how='any', subset=['age']).show()

+-------+----+----------+-------+
|   name| age|experience| salary|
+-------+----+----------+-------+
|  Harry|25.0|       5.0|50000.0|
| Steven|28.0|      10.0|70000.0|
|  David|27.0|       7.0|60000.0|
|Charlie|24.0|      NULL|   NULL|
+-------+----+----------+-------+



## fill na values

In [70]:
df_pyspark.na.fill(0, ['age', 'salary']).show()

+-------+----+----------+-------+
|   name| age|experience| salary|
+-------+----+----------+-------+
|  Harry|25.0|       5.0|50000.0|
| Steven|28.0|      10.0|70000.0|
|  David|27.0|       7.0|60000.0|
| Andrew| 0.0|       4.0|60000.0|
|Charlie|24.0|      NULL|    0.0|
+-------+----+----------+-------+



In [72]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age','experience','salary'],
    outputCols=[f'{i}_filled' for i in ['age','experience','salary']]
).setStrategy('mean')

In [73]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+-------+----+----------+-------+----------+-----------------+-------------+
|   name| age|experience| salary|age_filled|experience_filled|salary_filled|
+-------+----+----------+-------+----------+-----------------+-------------+
|  Harry|25.0|       5.0|50000.0|      25.0|              5.0|      50000.0|
| Steven|28.0|      10.0|70000.0|      28.0|             10.0|      70000.0|
|  David|27.0|       7.0|60000.0|      27.0|              7.0|      60000.0|
| Andrew|NULL|       4.0|60000.0|      26.0|              4.0|      60000.0|
|Charlie|24.0|      NULL|   NULL|      24.0|              6.5|      60000.0|
+-------+----+----------+-------+----------+-----------------+-------------+



# filter operation

In [74]:
df_pyspark.filter('salary <= 50000').show()

+-----+----+----------+-------+
| name| age|experience| salary|
+-----+----+----------+-------+
|Harry|25.0|       5.0|50000.0|
+-----+----+----------+-------+



In [78]:
df_pyspark.filter((df_pyspark['salary']<=50000) | (df_pyspark['age']<=25)).show()

+-------+----+----------+-------+
|   name| age|experience| salary|
+-------+----+----------+-------+
|  Harry|25.0|       5.0|50000.0|
|Charlie|24.0|      NULL|   NULL|
+-------+----+----------+-------+



In [79]:
df_pyspark.filter(~(df_pyspark['experience']<=5)).show()

+------+----+----------+-------+
|  name| age|experience| salary|
+------+----+----------+-------+
|Steven|28.0|      10.0|70000.0|
| David|27.0|       7.0|60000.0|
+------+----+----------+-------+



# group by and aggredate

In [93]:
from pyspark.sql.functions import lit, sum, avg, count

df_pyspark = df_pyspark.withColumn('team', lit('data'))
df_pyspark.groupBy('team').mean().show()

+----+--------+---------------+-----------+
|team|avg(age)|avg(experience)|avg(salary)|
+----+--------+---------------+-----------+
|data|    26.0|            6.5|    60000.0|
+----+--------+---------------+-----------+



In [87]:
df_pyspark.groupBy('age').count().show()

+----+-----+
| age|count|
+----+-----+
|NULL|    1|
|25.0|    1|
|28.0|    1|
|27.0|    1|
|24.0|    1|
+----+-----+



In [94]:
df_pyspark.groupBy('team').agg(sum("salary").alias("sum_salary"),
                               avg("age").alias("avg_age"),
                               count("name").alias("count_people")).show()

+----+----------+-------+------------+
|team|sum_salary|avg_age|count_people|
+----+----------+-------+------------+
|data|  240000.0|   26.0|           5|
+----+----------+-------+------------+

