In [17]:
import os
os.environ["HADOOP_HOME"] = "/home/hadoop/hadoop2"
os.environ["SPARK_HOME"] = "/home/hadoop/spark-2.3.1-bin-hadoop2.7"

In [18]:
!echo $HADOOP_HOME
!echo $SPARK_HOME
!echo $JAVA_HOME

/home/hadoop/hadoop2
/home/hadoop/spark-2.3.1-bin-hadoop2.7
/usr/lib/jvm/java-1.8.0-openjdk-amd64


In [19]:
import findspark
findspark.init()

In [21]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

# Data Loading And Viewing

In [29]:
dataCSV =  spark.read.option("header",True).option("delimiter",",").option("inferSchema",True).csv("file:///home/hadoop/SparkPracs/auto-data.csv")
#the file path should be of local else it will take the hdfs file location

In [30]:
dataCSV.show(5)

+----------+--------+------+-----+---------+-----+---------+---+----+--------+-------+-----+
|      MAKE|FUELTYPE|ASPIRE|DOORS|     BODY|DRIVE|CYLINDERS| HP| RPM|MPG-CITY|MPG-HWY|PRICE|
+----------+--------+------+-----+---------+-----+---------+---+----+--------+-------+-----+
|    subaru|     gas|   std|  two|hatchback|  fwd|     four| 69|4900|      31|     36| 5118|
| chevrolet|     gas|   std|  two|hatchback|  fwd|    three| 48|5100|      47|     53| 5151|
|     mazda|     gas|   std|  two|hatchback|  fwd|     four| 68|5000|      30|     31| 5195|
|    toyota|     gas|   std|  two|hatchback|  fwd|     four| 62|4800|      35|     39| 5348|
|mitsubishi|     gas|   std|  two|hatchback|  fwd|     four| 68|5500|      37|     41| 5389|
+----------+--------+------+-----+---------+-----+---------+---+----+--------+-------+-----+
only showing top 5 rows



In [31]:
dataCSV.printSchema()

root
 |-- MAKE: string (nullable = true)
 |-- FUELTYPE: string (nullable = true)
 |-- ASPIRE: string (nullable = true)
 |-- DOORS: string (nullable = true)
 |-- BODY: string (nullable = true)
 |-- DRIVE: string (nullable = true)
 |-- CYLINDERS: string (nullable = true)
 |-- HP: integer (nullable = true)
 |-- RPM: integer (nullable = true)
 |-- MPG-CITY: integer (nullable = true)
 |-- MPG-HWY: integer (nullable = true)
 |-- PRICE: integer (nullable = true)



In [35]:
jsonData = spark.read.option("inferSchema",True).json("file:///home/hadoop/SparkPracs/customerData.json")

In [36]:
jsonData.show(5)

+---+------+------+-----------------+------+
|age|deptid|gender|             name|salary|
+---+------+------+-----------------+------+
| 32|   100|  male|Benjamin Garrison|  3000|
| 40|   200|  male|    Holland Drake|  4500|
| 26|   100|  male|  Burks Velasquez|  2700|
| 51|   100|female|    June Rutledge|  4300|
| 44|   200|  male|    Nielsen Knapp|  6500|
+---+------+------+-----------------+------+



In [37]:
jsonData.printSchema()

root
 |-- age: string (nullable = true)
 |-- deptid: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)



In [38]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DoubleType

In [39]:
schemaCustData = StructType(
    [
        StructField("age",IntegerType(),True),
        StructField("deptid",IntegerType(),True),
        StructField("gender",StringType(),True),
        StructField("name",StringType(),True),
        StructField("salary",DoubleType(),True),
    ]
)
#true supports null

In [40]:
jsonDataWithSchema = spark.read.schema(schemaCustData).json("file:///home/hadoop/SparkPracs/customerData.json")

In [41]:
jsonDataWithSchema.show(5)

+----+------+------+-----------------+------+
| age|deptid|gender|             name|salary|
+----+------+------+-----------------+------+
|  32|   100|  male|Benjamin Garrison|3000.0|
|  40|   200|  male|    Holland Drake|4500.0|
|  26|   100|  male|  Burks Velasquez|2700.0|
|null|  null|  null|             null|  null|
|  44|   200|  male|    Nielsen Knapp|6500.0|
+----+------+------+-----------------+------+



In [42]:
jsonDataWithSchema.printSchema()

root
 |-- age: integer (nullable = true)
 |-- deptid: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: double (nullable = true)



In [43]:
#to try database
jsonData.registerTempTable("customerdata")

In [44]:
spark.sql("select * from customerdata").show()

+---+------+------+-----------------+------+
|age|deptid|gender|             name|salary|
+---+------+------+-----------------+------+
| 32|   100|  male|Benjamin Garrison|  3000|
| 40|   200|  male|    Holland Drake|  4500|
| 26|   100|  male|  Burks Velasquez|  2700|
| 51|   100|female|    June Rutledge|  4300|
| 44|   200|  male|    Nielsen Knapp|  6500|
+---+------+------+-----------------+------+



In [45]:
spark.sql("select name from customerdata where salary>=5000").show()

+-------------+
|         name|
+-------------+
|Nielsen Knapp|
+-------------+



In [48]:
#function Implementation
from pyspark.sql.functions import col
#Select Query
jsonData.select(col("name"),col("salary")).show()

+-----------------+------+
|             name|salary|
+-----------------+------+
|Benjamin Garrison|  3000|
|    Holland Drake|  4500|
|  Burks Velasquez|  2700|
|    June Rutledge|  4300|
|    Nielsen Knapp|  6500|
+-----------------+------+



In [49]:
jsonData.filter(jsonData.age == 40).show()

+---+------+------+-------------+------+
|age|deptid|gender|         name|salary|
+---+------+------+-------------+------+
| 40|   200|  male|Holland Drake|  4500|
+---+------+------+-------------+------+



In [50]:
jsonData.filter(jsonData.age.between(30,40)).show()

+---+------+------+-----------------+------+
|age|deptid|gender|             name|salary|
+---+------+------+-----------------+------+
| 32|   100|  male|Benjamin Garrison|  3000|
| 40|   200|  male|    Holland Drake|  4500|
+---+------+------+-----------------+------+



In [51]:
jsonData.filter(jsonData.age >= 25).show()

+---+------+------+-----------------+------+
|age|deptid|gender|             name|salary|
+---+------+------+-----------------+------+
| 32|   100|  male|Benjamin Garrison|  3000|
| 40|   200|  male|    Holland Drake|  4500|
| 26|   100|  male|  Burks Velasquez|  2700|
| 51|   100|female|    June Rutledge|  4300|
| 44|   200|  male|    Nielsen Knapp|  6500|
+---+------+------+-----------------+------+



In [52]:
#machine learning
from pyspark.sql.types import StructType,StructField,DoubleType

In [53]:
salarySchema = StructType([
    StructField("yearsExperience",DoubleType(),True),
    StructField("salary",DoubleType(),True)
])

In [55]:
#Load the data 
salaryData = spark.read.schema(salarySchema).option("header",True).csv("file:///home/hadoop/SparkPracs/Salary_Data.csv")

In [56]:
salaryData.show(5)

+---------------+-------+
|yearsExperience| salary|
+---------------+-------+
|            1.1|39343.0|
|            1.3|46205.0|
|            1.5|37731.0|
|            2.0|43525.0|
|            2.2|39891.0|
+---------------+-------+
only showing top 5 rows



In [59]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ["yearsExperience"], outputCol = "features")

In [60]:
features = vectorAssembler.transform(salaryData)

In [61]:
features.show(1)

+---------------+-------+--------+
|yearsExperience| salary|features|
+---------------+-------+--------+
|            1.1|39343.0|   [1.1]|
+---------------+-------+--------+
only showing top 1 row



In [62]:
from pyspark.ml.regression import LinearRegression
lrModel = LinearRegression(featuresCol = "features", labelCol ="salary")

In [63]:
finalModel = lrModel.fit(features)