<a href="https://colab.research.google.com/github/Kzis/my-saprk/blob/main/ttba_spark_lab2_lab3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder \
   .appName("Neural Network Model") \
   .config("spark.executor.memory", "3gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [6]:
sc

# Lab 2

In [7]:
from pyspark.sql.functions import *

# Define a dataset.

df = sc.parallelize([
    (10, '', 10000), (20, 'Female', 30000), (None, 'Male', 80000), (None, 'Male', 5000)
]).toDF(["age", "gender", "income"])

df.show()


+----+------+------+
| age|gender|income|
+----+------+------+
|  10|      | 10000|
|  20|Female| 30000|
|null|  Male| 80000|
|null|  Male|  5000|
+----+------+------+



In [8]:
df.describe().show()


+-------+------------------+------+-----------------+
|summary|               age|gender|           income|
+-------+------------------+------+-----------------+
|  count|                 2|     4|                4|
|   mean|              15.0|  null|          31250.0|
| stddev|7.0710678118654755|  null|34247.87098005753|
|    min|                10|      |             5000|
|    max|                20|  Male|            80000|
+-------+------------------+------+-----------------+



# Data Cleansing: Null


In [9]:
# Treat Null Value (None) with Average one.

avg_age = df.na.drop().agg(avg("age")).collect()[0][0]

sparkf_replaceNull = udf(lambda x: avg_age if x == None else x)

no_null_df = df.withColumn('age', sparkf_replaceNull(col('age')))

no_null_df.show()


+----+------+------+
| age|gender|income|
+----+------+------+
|  10|      | 10000|
|  20|Female| 30000|
|15.0|  Male| 80000|
|15.0|  Male|  5000|
+----+------+------+



# Data Cleansing: Empty Values


In [11]:
#  Treat Missing Value with Defined Values.

from pyspark.sql.functions import *

treat_missing = udf(lambda x: "Male_Assume" if x == "" else x)

no_missing_df = no_null_df.withColumn('new_gender',treat_missing(no_null_df.gender))

no_missing_df.show()



+----+------+------+-----------+
| age|gender|income| new_gender|
+----+------+------+-----------+
|  10|      | 10000|Male_Assume|
|  20|Female| 30000|     Female|
|15.0|  Male| 80000|       Male|
|15.0|  Male|  5000|       Male|
+----+------+------+-----------+



# Data Cleansing: Outliers (Business-oriented)




In [12]:
# Treat Outliner with Remove one.

no_outlier_df = no_missing_df.filter(col('income') >= 10000)

no_outlier_df .show()


+----+------+------+-----------+
| age|gender|income| new_gender|
+----+------+------+-----------+
|  10|      | 10000|Male_Assume|
|  20|Female| 30000|     Female|
|15.0|  Male| 80000|       Male|
+----+------+------+-----------+



# Lab 3

In [13]:
no_outlier_df.registerTempTable('cleaned_table')


In [14]:
spark.sql('select * from cleaned_table').show()


+----+------+------+-----------+
| age|gender|income| new_gender|
+----+------+------+-----------+
|  10|      | 10000|Male_Assume|
|  20|Female| 30000|     Female|
|15.0|  Male| 80000|       Male|
+----+------+------+-----------+



In [15]:
spark.sql('select age, count(*) from cleaned_table group by age').show()


+----+--------+
| age|count(1)|
+----+--------+
|15.0|       1|
|  20|       1|
|  10|       1|
+----+--------+



In [16]:
spark.sql('select age, count(*) as countAge from cleaned_table group by age having age >=15 order by age desc').show()

+----+--------+
| age|countAge|
+----+--------+
|  20|       1|
|15.0|       1|
+----+--------+



In [17]:
planDict_list = [{'age_plan':15,'plan':'Plan A'},{'age_plan':20,'plan':'Plan B'}]


In [18]:
plan_df = spark.createDataFrame(planDict_list)
plan_df.show()




+--------+------+
|age_plan|  plan|
+--------+------+
|      15|Plan A|
|      20|Plan B|
+--------+------+



In [19]:
plan_df.registerTempTable('plan_table')


In [20]:
spark.sql('select * from cleaned_table \
JOIN plan_table ON cleaned_table.age == plan_table.age_plan').show()


+----+------+------+----------+--------+------+
| age|gender|income|new_gender|age_plan|  plan|
+----+------+------+----------+--------+------+
|15.0|  Male| 80000|      Male|      15|Plan A|
|  20|Female| 30000|    Female|      20|Plan B|
+----+------+------+----------+--------+------+



In [21]:
spark.sql('select * from cleaned_table \
JOIN plan_table ON cleaned_table.age == plan_table.age_plan \
WHERE age between 10 and 15').show()


+----+------+------+----------+--------+------+
| age|gender|income|new_gender|age_plan|  plan|
+----+------+------+----------+--------+------+
|15.0|  Male| 80000|      Male|      15|Plan A|
+----+------+------+----------+--------+------+

