# **PySpark Basic Operations**

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("My First pyspark app") \
        .getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/13 14:34:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## **Load Files**

### **Json**

In [2]:
df_json = spark.read.option("multiline", True).json("data/03.json")

In [3]:
df_json.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: long (nullable = true)
 |-- age: long (nullable = true)
 |-- children: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hobbies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- married: boolean (nullable = true)
 |-- name: string (nullable = true)



### **CSV**

1. All columns are in string format
1. Automatic inference of columns types
1. Custom columns types
1. Read CSV files from a specific directory

In [4]:
# All columns are in string format
df_csv_string = spark.read.csv("data/03.csv", header=True) 

In [5]:
# Automatic inference of columns types
df_csv_auto = spark.read.csv("data/03.csv", header=True, inferSchema=True) 

In [6]:
# Custom columns types
from pyspark.sql.types import *

custom_types = StructType(

    [
        StructField(name='A', dataType=DoubleType(), nullable=True),
        StructField(name='B', dataType=StringType(), nullable=True),
        StructField(name='C', dataType=IntegerType(), nullable=True)
    ]

)

df_csv_custom = spark.read.csv("data/03.csv", header=True, schema=custom_types)
df_csv_custom.printSchema()

root
 |-- A: double (nullable = true)
 |-- B: string (nullable = true)
 |-- C: integer (nullable = true)



In [7]:
# Read CSV files from a specific directory
df_csv_folder = spark.read.csv("data/03-many-csv", header=True)
df_csv_folder.printSchema()
df_csv_folder.show()

root
 |-- A: string (nullable = true)
 |-- B: string (nullable = true)
 |-- C: string (nullable = true)

+---+---+---+
|  A|  B|  C|
+---+---+---+
|1.0|  2|  3|
|4.0|  5|  6|
|7.0|  8|  9|
|2.0|  2|  2|
|2.0|  2|  2|
|2.0|  2|  2|
+---+---+---+



### **Pandas**

!!! note "Pandas 的特色[^pandas-1]"
    1. Scalability beyond a **single** machine  
    1. Interactive data visualization  
    1. Leveraging unified analytics functionality in Spark  

[^pandas-1]:
    [Pandas API on Upcoming Apache Spark™ 3.2](https://www.databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html)

In [8]:
from pyspark import pandas as ps



In [9]:
# From csv
df_with_spark_pandas = ps.read_csv("data/03.csv")
df_with_spark_pandas



Unnamed: 0,A,B,C
0,1.0,2,3
1,4.0,5,6
2,7.0,8,9


### **Parquet**

TODO

## **Transform**

In [28]:
df = df_csv_custom.rdd # Transform into rdd format

In [29]:
demo_map = df.map(lambda row : row["A"]**2)
demo_map.collect()

[1.0, 16.0, 49.0]

In [33]:
demo_filter = df.filter(lambda row : row["A"] > 5)
demo_filter.collect()

[Row(A=7.0, B='8', C=9)]

| 轉換 | 範例 |
| --- | --- |
| map() | 將函數應用於RDD中的每個元素，並返回一個新的RDD。例如，rdd.map(lambda x: x * 2)會將RDD中的每個元素乘以2。 |
| flatMap() | 將函數應用於RDD中的每個元素，並返回一個新的RDD，其中包含函數返回的所有元素。例如，rdd.flatMap(lambda x: x.split(" "))會將RDD中的每個字符串按空格分割，並返回一個包含所有單詞的RDD。 |
| filter() | 根據函數的返回值過濾RDD中的元素，並返回一個新的RDD。例如，rdd.filter(lambda x: x % 2 == 0)會過濾掉RDD中的奇數元素。 |
| groupByKey() | 根據鍵將RDD中的元素分組，並返回一個新的RDD，其中每個鍵對應一個可迭代的值序列。例如，rdd.groupByKey()會將RDD中的鍵值對按鍵分組。 |
| reduceByKey() | 根據鍵將RDD中的元素分組，並使用函數對每組值進行聚合，並返回一個新的RDD，其中每個鍵對應一個聚合後的值。例如，rdd.reduceByKey(lambda x, y: x + y)會將RDD中的鍵值對按鍵分組，並對每組值求和。 |
| sortByKey() | 根據鍵將RDD中的元素排序，並返回一個新的RDD。例如，rdd.sortByKey()會將RDD中的鍵值對按鍵升序排序。 |
| union() | 將兩個或多個RDD合併為一個新的RDD。例如，rdd1.union(rdd2)會將rdd1和rdd2合併為一個新的RDD。 |
| intersection() | 返回兩個或多個RDD共有的元素組成的新的RDD。例如，rdd1.intersection(rdd2)會返回rdd1和rdd2共有的元素組成的新的RDD。 |
| distinct() | 返回去除重複元素後的新的RDD。例如，rdd.distinct()會返回去除重複元素後的新的RDD。 |
| join() | 根據鍵將兩個或多個RDD中的元素連接起來，並返回一個新的RDD，其中每個鍵對應一個由兩個或多個值組成的元組。例如，rdd1.join(rdd2)會根據鍵將rdd1和rdd2中的元素連接起來，並返回一個新的RDD。 |
| leftOuterJoin() | 根據鍵將兩個或多個RDD中的元素連接起來，並返回一個新的RDD，其中每個鍵對應一個由左邊值和右邊值（如果存在