<a href="https://colab.research.google.com/github/bhagyapatel/Big-Data/blob/main/spark_basic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Spark Installation 

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


In [None]:
!pip install -q findspark
import findspark
findspark.init()

##RDD

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("rdd trial").getOrCreate()

In [None]:
RDD1 = spark.sparkContext.parallelize([[1,"bhawana","patel","pune"],[2,"heena","khan","mumbai"]],2)

In [None]:
RDD1.collect()

[[1, 'bhawana', 'patel', 'pune'], [2, 'heena', 'khan', 'mumbai']]

In [None]:
RDD1.toDF(schema=["id","name","lname","city"]).show()

+---+-------+-----+------+
| id|   name|lname|  city|
+---+-------+-----+------+
|  1|bhawana|patel|  pune|
|  2|  heena| khan|mumbai|
+---+-------+-----+------+



##DataFrame Creation

In [None]:
product = ["jacket","hat","shoe"]
price = [1500, 300, 1000]
quantitity = [2, 3, 1]

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("First").getOrCreate()

In [None]:
columns = ["product","price","quantitity"]
df = spark.createDataFrame(zip(product,price,quantitity),columns)

In [None]:
from pyspark.sql.functions import *
df1 = df.withColumn("total_price", expr("quantitity * price"))
df1.show()

+-------+-----+----------+-----------+
|product|price|quantitity|total_price|
+-------+-----+----------+-----------+
| jacket| 1500|         2|       3000|
|    hat|  300|         3|        900|
|   shoe| 1000|         1|       1000|
+-------+-----+----------+-----------+



In [None]:
#by calculating 30% of Jacket , 10% of hat and nothing for shoe on the Total_price

df2 = df1.withColumn("Discount_amount", when(col("product") =="jacket", df1.total_price*30/100) \
              .when(col("product") == "hat", df1.total_price*10/100) \
              .otherwise(df1.total_price*0/100))
df2.show()

+-------+-----+----------+-----------+---------------+
|product|price|quantitity|total_price|Discount_amount|
+-------+-----+----------+-----------+---------------+
| jacket| 1500|         2|       3000|          900.0|
|    hat|  300|         3|        900|           90.0|
|   shoe| 1000|         1|       1000|            0.0|
+-------+-----+----------+-----------+---------------+



In [None]:
df3 = df2.withColumn("net_price", df2.total_price - df2.Discount_amount)
df3.show()

+-------+-----+----------+-----------+---------------+---------+
|product|price|quantitity|total_price|Discount_amount|net_price|
+-------+-----+----------+-----------+---------------+---------+
| jacket| 1500|         2|       3000|          900.0|   2100.0|
|    hat|  300|         3|        900|           90.0|    810.0|
|   shoe| 1000|         1|       1000|            0.0|   1000.0|
+-------+-----+----------+-----------+---------------+---------+



In [None]:
df3.write.option("header",True) \
.partitionBy("product") \
.parquet("/content/sample_data/df3_parquet",)

In [None]:
df1.withColumn("Discount_amount", when(col("product") =="jacket", col("total_price")*30/100) \
              .when(col("product") == "hat", col("total_price")*10/100) \
              .otherwise(df1.total_price*0/100)).show()

+-------+-----+----------+-----------+---------------+
|product|price|quantitity|total_price|Discount_amount|
+-------+-----+----------+-----------+---------------+
| jacket| 1500|         2|       3000|          900.0|
|    hat|  300|         3|        900|           90.0|
|   shoe| 1000|         1|       1000|            0.0|
+-------+-----+----------+-----------+---------------+



In [None]:
df_p = spark.read.parquet("/content/sample_data/df3_parquet")

In [None]:
df_p.show()

+-----+----------+-----------+---------------+---------+-------+
|price|quantitity|total_price|Discount_amount|net_price|product|
+-----+----------+-----------+---------------+---------+-------+
|  300|         3|        900|           90.0|    810.0|    hat|
| 1500|         2|       3000|          900.0|   2100.0| jacket|
| 1000|         1|       1000|            0.0|   1000.0|   shoe|
+-----+----------+-----------+---------------+---------+-------+



In [None]:
df11 = df_p.select("*").filter(df_p.net_price == 2100)

In [None]:
df11.write.partitionBy("product").option("mode","append").parquet("/content/sample_data/df3_parquet1")

In [None]:
df = spark.read.option("header",True).option("delimiter",";").csv("/content/test.csv")

In [None]:
df.show()

+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 30|   management|married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|  blue-collar|married|secondary|     no|      0|    yes|  no| unknown| 

In [None]:
df1 = df.dropDuplicates()
df1.count()

4521

In [None]:
df.count()

4521

In [None]:
df.filter(df.job.isNull()).count()

0

In [None]:
from pyspark.sql.functions import *
df.withColumn("balance",expr("regexp_replace(balance, 0, 1)")).show()

+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 35|   management| single| tertiary|     no|   1351|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 30|   management|married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|  blue-collar|married|secondary|     no|      1|    yes|  no| unknown| 

##GroupBy

In [None]:
d = df.groupBy("contact","loan").count()
d.show()

+---------+----+-----+
|  contact|loan|count|
+---------+----+-----+
| cellular|  no| 2447|
|  unknown|  no| 1126|
|telephone| yes|   44|
|  unknown| yes|  198|
|telephone|  no|  257|
| cellular| yes|  449|
+---------+----+-----+



In [None]:
df.select(df.marital).distinct().count()

3

In [None]:
df.select("marital","loan").distinct().count()

6

In [None]:
df.sort("education")

In [None]:
df1 = df.groupBy("education","housing","loan").count().orderBy("education")
df1.show()

+---------+-------+----+-----+
|education|housing|loan|count|
+---------+-------+----+-----+
|  primary|    yes| yes|   52|
|  primary|     no| yes|   42|
|  primary|     no|  no|  253|
|  primary|    yes|  no|  331|
|secondary|     no| yes|  165|
|secondary|    yes|  no| 1179|
|secondary|     no|  no|  711|
|secondary|    yes| yes|  251|
| tertiary|    yes|  no|  562|
| tertiary|     no|  no|  614|
| tertiary|    yes| yes|  101|
| tertiary|     no| yes|   73|
|  unknown|     no|  no|   99|
|  unknown|     no| yes|    5|
|  unknown|    yes| yes|    2|
|  unknown|    yes|  no|   81|
+---------+-------+----+-----+



In [None]:
from pyspark.sql.types import *
df.selectExpr("cast(balance as int) as balance1").printSchema()


root
 |-- balance1: integer (nullable = true)



In [None]:
df = df.withColumn("balance", expr("cast(balance as int)"))

In [None]:
df.groupBy("marital").max("balance").show()

+--------+------------+
| marital|max(balance)|
+--------+------------+
|divorced|       26306|
| married|       71188|
|  single|       27733|
+--------+------------+



In [None]:
df.printSchema()

root
 |-- age: string (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- campaign: string (nullable = true)
 |-- pdays: string (nullable = true)
 |-- previous: string (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)



##Filter and where

In [None]:
df.select("*").filter((df.loan == "yes") & (df.marital == "married")).count()

453

In [None]:
df.select("*").filter((df.loan == "yes") & (df.marital == "married")).show()