<a href="https://colab.research.google.com/github/JihunSKKU/PySpark/blob/main/SparkSQL_json_DB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SparkSQL - Lec08

## Saving a DataFrame, Convert to a DataFrame

In [None]:
import json
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession

sc = SparkContext()
spark = SparkSession(sc)

### Convert input data to DataFrame
**Row()**
- Makes possible to access like the attributes in RDBMS
- e.g., row1 = Row(age=11, name=‘Alice’), \
    row1.name ⇒ ‘Alice’, row1.age ⇒ 11

**toDF()**
- Convert RDD to DataFrame

In [None]:
from pyspark.sql import Row

RDD = sc.parallelize([
Row(dept_id='1', dept_info={'name': 'CS', 'loc': 'Seoul'}),
Row(dept_id='2', dept_info={'name': 'CS', 'loc': 'Suwon'}),
Row(dept_id='3', dept_info={'name': 'R&D', 'loc': 'Seoul'})])

DF = RDD.toDF() # RDD -> DataFrame
DF.show(truncate=False)

+-------+---------------------------+
|dept_id|dept_info                  |
+-------+---------------------------+
|1      |{name -> CS, loc -> Seoul} |
|2      |{name -> CS, loc -> Suwon} |
|3      |{name -> R&D, loc -> Seoul}|
+-------+---------------------------+



### Save DataFrame in JSON format

In [None]:
from pyspark.sql import Row
DF = sc.parallelize([
Row(dept_id='1', dept_info={'name': 'CS', 'loc': 'Seoul'}),
Row(dept_id='2', dept_info={'name': 'CS', 'loc': 'Suwon'}),
Row(dept_id='3', dept_info={'name': 'R&D', 'loc': 'Seoul'})]).toDF()

DF.write.json("./SKKU-DBP-24/DF_json")

In [None]:
# !rm -rf /content/SKKU-DBP-24/DF_json

### Convert json data to DataFrame

In [None]:
import json
from pyspark.sql import SQLContext

sqlCtx = SQLContext(sc)

RDD = sc.textFile("./SKKU-DBP-24/DF_json/*").map(lambda x: json.loads(x))

DF = sqlCtx.createDataFrame(RDD)

DF.show(truncate=False) # truncate=False: show all data

+-------+---------------------------+
|dept_id|dept_info                  |
+-------+---------------------------+
|1      |{name -> CS, loc -> Seoul} |
|2      |{name -> CS, loc -> Suwon} |
|3      |{name -> R&D, loc -> Seoul}|
+-------+---------------------------+



In [None]:
import json
from pyspark.sql import SQLContext

sqlCtx = SQLContext(sc)
DF = sqlCtx.read.json("./SKKU-DBP-24/DF_json/*")
DF.registerTempTable("dept")

DF.show(truncate=False) # truncate=False: show all data

+-------+------------+
|dept_id|dept_info   |
+-------+------------+
|2      |{Suwon, CS} |
|3      |{Seoul, R&D}|
|1      |{Seoul, CS} |
+-------+------------+



## Spark SQL Operations

In [None]:
from pyspark.sql import Row

RDD = sc.parallelize([
    Row(dept_id='1',dept_info={'name':'CS', 'loc':'Seoul'}),
    Row(dept_id='2',dept_info={'name':'CS', 'loc':'Suwon'})
])
DF = RDD.toDF()

DF.select("dept_info.name").show(truncate=False)

+----+
|name|
+----+
|CS  |
|CS  |
+----+



In [None]:
from pyspark.sql import Row

RDD = sc.parallelize([
    Row(dept_id='1',dept_info={'name':'CS', 'loc':'Seoul'}),
    Row(dept_id='2',dept_info={'name':'CS', 'loc':'Suwon'})
])
DF = RDD.toDF()

DF.select("dept_id","dept_info.loc").show(truncate=False)

+-------+-----+
|dept_id|loc  |
+-------+-----+
|1      |Seoul|
|2      |Suwon|
+-------+-----+



### select(column_name): 3ways

In [None]:
from pyspark.sql import Row

RDD = sc.parallelize([
    Row(dept_id='1',dept_info={'name':'CS', 'loc':'Seoul'}),
    Row(dept_id='2',dept_info={'name':'CS', 'loc':'Suwon'})
])
DF = RDD.toDF()

## First
DF.select(DF.dept_id).show()

## second
from pyspark.sql.functions import col
DF.select(col("dept_id")).show()

## Third
DF.select("dept_id").show()

+-------+
|dept_id|
+-------+
|      1|
|      2|
+-------+

+-------+
|dept_id|
+-------+
|      1|
|      2|
+-------+

+-------+
|dept_id|
+-------+
|      1|
|      2|
+-------+



### Convert type of column

In [None]:
from pyspark.sql import Row

RDD = sc.parallelize([
    Row(dept_id='1',dept_info={'name':'CS', 'loc':'Seoul'}),
    Row(dept_id='2',dept_info={'name':'CS', 'loc':'Suwon'})
])
DF = RDD.toDF()

from pyspark.sql.types import IntegerType

DF_id = DF.withColumn("dept_id", DF["dept_id"].cast(IntegerType()))
DF_id.printSchema()

root
 |-- dept_id: integer (nullable = true)
 |-- dept_info: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



### Add new column

In [None]:
from pyspark.sql import Row

RDD = sc.parallelize([
    Row(dept_id='1',dept_info={'name':'CS', 'loc':'Seoul'}),
    Row(dept_id='2',dept_info={'name':'CS', 'loc':'Suwon'})])
DF = RDD.toDF()

from pyspark.sql.functions import lit # Create a column of the literal value
DF_with_new1 = DF.withColumn("new1", lit(0))
DF_with_new1.show(truncate=False)

from pyspark.sql.functions import exp
DF_with_new2 = DF.withColumn("new2", exp("dept_id"))
DF_with_new2.show(truncate=False)

+-------+--------------------------+----+
|dept_id|dept_info                 |new1|
+-------+--------------------------+----+
|1      |{name -> CS, loc -> Seoul}|0   |
|2      |{name -> CS, loc -> Suwon}|0   |
+-------+--------------------------+----+

+-------+--------------------------+------------------+
|dept_id|dept_info                 |new2              |
+-------+--------------------------+------------------+
|1      |{name -> CS, loc -> Seoul}|2.7182818284590455|
|2      |{name -> CS, loc -> Suwon}|7.38905609893065  |
+-------+--------------------------+------------------+



### Add new row

In [None]:
from pyspark.sql import Row

# DF1 with 2 rows
DF1 = sc.parallelize([
Row(dept_id='1', dept_info={'name': 'CS', 'loc': 'Seoul'}),
Row(dept_id='2', dept_info={'name': 'CS', 'loc': 'Busan'})]).toDF()

# DF2 with 1 row
DF2 = sc.parallelize([
Row(dept_id='3', dept_info={'name': 'R&D', 'loc': 'Suwon'})]).toDF()
DF1.unionByName(DF2).show(truncate=False)

+-------+---------------------------+
|dept_id|dept_info                  |
+-------+---------------------------+
|1      |{name -> CS, loc -> Seoul} |
|2      |{name -> CS, loc -> Busan} |
|3      |{name -> R&D, loc -> Suwon}|
+-------+---------------------------+



### Remove existing Column

In [None]:
from pyspark.sql import Row

DF = sc.parallelize([
    Row(dept_id='1', dept_info={'name': 'CS', 'loc': 'Seoul'}),
    Row(dept_id='2', dept_info={'name': 'CS', 'loc': 'Busan'}),
    Row(dept_id='3', dept_info={'name': 'R&D', 'loc': 'Suwon'})]).toDF()

DF.drop("dept_id").show(truncate=False)

+---------------------------+
|dept_info                  |
+---------------------------+
|{name -> CS, loc -> Seoul} |
|{name -> CS, loc -> Busan} |
|{name -> R&D, loc -> Suwon}|
+---------------------------+



### Extract row

In [None]:
from pyspark.sql import Row

DF = sc.parallelize([
    Row(dept_id='1', dept_info={'name': 'CS', 'loc': 'Seoul'}),
    Row(dept_id='2', dept_info={'name': 'CS', 'loc': 'Busan'}),
    Row(dept_id='3', dept_info={'name': 'R&D', 'loc': 'Suwon'})]).toDF()

# Avoid sub table
DF_flatten = DF.select('dept_id', 'dept_info.name', 'dept_info.loc')

# Extract row that meets the condition
DF_flatten.filter(DF_flatten["name"] == "CS").show()

+-------+----+-----+
|dept_id|name|  loc|
+-------+----+-----+
|      1|  CS|Seoul|
|      2|  CS|Busan|
+-------+----+-----+



### others function

In [None]:
from pyspark.sql import Row

DF = sc.parallelize([
    Row(dept_id='1', dept_info={'name': 'CS', 'loc': 'Seoul'}),
    Row(dept_id='2', dept_info={'name': 'CS', 'loc': 'Busan'}),
    Row(dept_id='3', dept_info={'name': 'R&D', 'loc': 'Suwon'})]).toDF()

# Avoid sub-table
DF_flatten = DF.select("dept_id", "dept_info.name", "dept_info.loc")
DF_flatten.groupBy("name").count().show() # get number of data

+----+-----+
|name|count|
+----+-----+
|  CS|    2|
| R&D|    1|
+----+-----+



In [None]:
from pyspark.sql import Row

DF1 = sc.parallelize([Row(dept_id='1', name='CS', loc= 'Suwon'),
    Row(dept_id='2', name='CS', loc= 'Busan')]).toDF()

DF2 = sc.parallelize([Row(dept_id='3', name='R&D', loc= 'Seoul'),
    Row(dept_id='4', name='R&D', loc='Busan')]).toDF()

DF_join = DF1.join(DF2, DF1["loc"] == DF2["loc"]) # join
DF_join.show()

+-------+----+-----+-------+----+-----+
|dept_id|name|  loc|dept_id|name|  loc|
+-------+----+-----+-------+----+-----+
|      2|  CS|Busan|      4| R&D|Busan|
+-------+----+-----+-------+----+-----+



In [None]:
from pyspark.sql import Row

DF = sc.parallelize(
    [Row(dept_id='1', dept_info={'name': 'CS', 'loc': 'Seoul'}),
    Row(dept_id='2', dept_info={'name': 'CS', 'loc': 'Busan'}),
    Row(dept_id='3', dept_info={'name': 'R&D', 'loc': 'Suwon'})]).toDF()

# Avoid sub-table
DF_flatten = DF.select('dept_id', 'dept_info.name', 'dept_info.loc')

# Convert DataFrame to RDD
RDD_from_DF = DF_flatten.rdd.map(lambda x: x.name)
print(RDD_from_DF.collect())

['CS', 'CS', 'R&D']


In [None]:
from pyspark.sql import Row

DF = sc.parallelize([
    Row(dept_id='1', dept_info={'name': 'CS', 'loc': 'Seoul'}),
    Row(dept_id='2', dept_info={'name': 'CS', 'loc': 'Busan'}),
    Row(dept_id='3', dept_info={'name': 'R&D', 'loc': 'Seoul'})]).toDF()

DF.createTempView("temp_table")

sqlDF = spark.sql("SELECT dept_info.name \
    FROM temp_table \
    WHERE dept_info.loc==\"Seoul\"")
sqlDF.show()

+----+
|name|
+----+
|  CS|
| R&D|
+----+



In [None]:
spark.catalog.dropTempView("temp_table")

True

In [None]:
from pyspark.sql import Row

DF = sc.parallelize([
    Row(dept_id='1', dept_info={'name': 'CS', 'loc': 'Seoul'}),
    Row(dept_id='2', dept_info={'name': 'CS', 'loc': 'Busan'}),
    Row(dept_id='3', dept_info={'name': 'R&D', 'loc': 'Seoul'})]).toDF()

DF.createOrReplaceTempView("temp_table")

sqlDF = spark.sql("SELECT dept_info.name \
    FROM temp_table \
    WHERE dept_info.loc==\"Seoul\"")
sqlDF.show()

+----+
|name|
+----+
|  CS|
| R&D|
+----+



## Spark ML library - Clustering

K-means clustering algorithm
1. Initialize centroids
2. Assign cluster to each data according to its nearest centroid
3. Update centroids towards the center of data
4. Repeat 2, 3 until centroids remain unchanged

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder.appName("KMeansPractice").getOrCreate()

data = [[0.0, 0.1], [1.0, 1.0], [0.5, 0.6], [0.5, 2.0], [9.0, 8.0], [8.0, 9.0], [9.0, 9.5], [10.0, 10.0]]
columns = ["feature1", "feature2"]

df = spark.createDataFrame(data, columns)
df.show()

+--------+--------+
|feature1|feature2|
+--------+--------+
|     0.0|     0.1|
|     1.0|     1.0|
|     0.5|     0.6|
|     0.5|     2.0|
|     9.0|     8.0|
|     8.0|     9.0|
|     9.0|     9.5|
|    10.0|    10.0|
+--------+--------+



In [None]:
assembler = VectorAssembler(inputCols=columns, outputCol="features")

df = assembler.transform(df)
df.show()

+--------+--------+-----------+
|feature1|feature2|   features|
+--------+--------+-----------+
|     0.0|     0.1|  [0.0,0.1]|
|     1.0|     1.0|  [1.0,1.0]|
|     0.5|     0.6|  [0.5,0.6]|
|     0.5|     2.0|  [0.5,2.0]|
|     9.0|     8.0|  [9.0,8.0]|
|     8.0|     9.0|  [8.0,9.0]|
|     9.0|     9.5|  [9.0,9.5]|
|    10.0|    10.0|[10.0,10.0]|
+--------+--------+-----------+



### Train the K-means model
- KMeans(featuresCol, predictionCol, k, maxIter, distanceMeasure)
    - featuresCol: Features column name.
    - predictionCol: Prediction column name.
    - k: The number of clusters to create.
    - maxIter: Max number of iteration.
    - distanceMeasure: The distance measure. (euclidean/cosine)


In [None]:
kmeans = KMeans(featuresCol='features',
                predictionCol='prediction',
                k=2,
                maxIter=20,
                distanceMeasure='euclidean')

model = kmeans.fit(df)

In [None]:
centroids = model.clusterCenters()
predictions = model.transform(df).select("features", "prediction")

In [None]:
print("Cluster centroids:")
for cent in centroids:
    print(cent)

print("Result:")
predictions.show()

Cluster centroids:
[9.    9.125]
[0.5   0.925]
Result:
+-----------+----------+
|   features|prediction|
+-----------+----------+
|  [0.0,0.1]|         1|
|  [1.0,1.0]|         1|
|  [0.5,0.6]|         1|
|  [0.5,2.0]|         1|
|  [9.0,8.0]|         0|
|  [8.0,9.0]|         0|
|  [9.0,9.5]|         0|
|[10.0,10.0]|         0|
+-----------+----------+



## Spark ML library - Classification

### Logistic Regression

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder.appName("IrisLogisticRegression").getOrCreate()

iris_data = spark.read.option('header', 'true').option('inferSchema', 'true').csv('Iris.csv')

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
idx_dict = {'Iris-setosa': 0,
            'Iris-versicolor': 1,
            'Iris-virginica': 2}

# User Define Function
label_mapping_udf = udf(lambda label: idx_dict.get(label), IntegerType())
iris_data = iris_data.withColumn("Species", label_mapping_udf(iris_data["Species"]))
iris_data.show()

+---+-------------+------------+-------------+------------+-------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species|
+---+-------------+------------+-------------+------------+-------+
|  1|          5.1|         3.5|          1.4|         0.2|      0|
|  2|          4.9|         3.0|          1.4|         0.2|      0|
|  3|          4.7|         3.2|          1.3|         0.2|      0|
|  4|          4.6|         3.1|          1.5|         0.2|      0|
|  5|          5.0|         3.6|          1.4|         0.2|      0|
|  6|          5.4|         3.9|          1.7|         0.4|      0|
|  7|          4.6|         3.4|          1.4|         0.3|      0|
|  8|          5.0|         3.4|          1.5|         0.2|      0|
|  9|          4.4|         2.9|          1.4|         0.2|      0|
| 10|          4.9|         3.1|          1.5|         0.1|      0|
| 11|          5.4|         3.7|          1.5|         0.2|      0|
| 12|          4.8|         3.4|          1.6|  

In [None]:
# Assemble the features into a vector column and name the column to "features"
assembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],
                            outputCol="features")

# Rename the target column to "label"
iris_data = assembler.transform(iris_data).select("features", "Species").withColumnRenamed("Species", "label")

In [None]:
train_data, test_data = iris_data.randomSplit([0.8, 0.2], seed=2023)

In [None]:
from pyspark.ml.classification import LogisticRegression

logistic_regression = LogisticRegression(featuresCol="features",
                                         labelCol="label",
                                         predictionCol='prediction',
                                         maxIter=100)

model = logistic_regression.fit(train_data)

In [None]:
model

LogisticRegressionModel: uid=LogisticRegression_06b914faa6d6, numClasses=3, numFeatures=4

In [None]:
predictions = model.transform(test_data)

In [None]:
predictions.show()

+-----------------+-----+--------------------+--------------------+----------+
|         features|label|       rawPrediction|         probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[4.7,3.2,1.3,0.2]|    0|[9438.32169347803...|       [1.0,0.0,0.0]|       0.0|
|[4.9,3.1,1.5,0.1]|    0|[8794.92750919444...|       [1.0,0.0,0.0]|       0.0|
|[5.0,3.6,1.4,0.2]|    0|[9604.88573039206...|       [1.0,0.0,0.0]|       0.0|
|[5.4,3.9,1.7,0.4]|    0|[8318.84741194077...|       [1.0,0.0,0.0]|       0.0|
|[5.5,2.4,3.7,1.0]|    1|[-711.8096549038,...|       [0.0,1.0,0.0]|       1.0|
|[5.5,2.5,4.0,1.3]|    1|[-1965.4704489795...|       [0.0,1.0,0.0]|       1.0|
|[5.6,3.0,4.1,1.3]|    1|[-1128.9673545014...|       [0.0,1.0,0.0]|       1.0|
|[5.6,3.0,4.5,1.5]|    1|[-2440.3856093591...|       [0.0,1.0,0.0]|       1.0|
|[5.7,2.8,4.1,1.3]|    1|[-1826.3269796827...|       [0.0,1.0,0.0]|       1.0|
|[5.8,2.7,4.1,1.0]|    1|[-1258.7122062005...|      

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

Accuracy: 0.9583333333333334
