In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("cap_stone").getOrCreate()
import pyspark.sql.functions as psf

## Imporing Data

In [3]:
Txn = spark.read.options(inferschema = 'True',delimiter = ',',header = 'True')\
      .csv('Car_sales_transactions.csv')

In [4]:
Txn.printSchema()

root
 |-- Sales_ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Selling_Price: integer (nullable = true)
 |-- km_Driven: integer (nullable = true)
 |-- City_Code: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Postal_Code: integer (nullable = true)
 |-- Fuel: string (nullable = true)
 |-- Seller_Type: string (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Owner: string (nullable = true)
 |-- Mileage: string (nullable = true)
 |-- Engine: string (nullable = true)
 |-- Max_Power: string (nullable = true)
 |-- Seats: integer (nullable = true)



In [5]:
City_Master = spark.read.options(inferschema = 'True',delimiter = ',',header = 'True')\
      .csv('City_Master.csv')
Postal_Code_Master = spark.read.options(inferschema = 'True',delimiter = ',',header = 'True')\
      .csv('Postal_Code_Master.csv')
Region_Master = spark.read.options(inferschema = 'True',delimiter = ',',header = 'True')\
      .csv('Region_Master.csv')
Region_State_Mapping = spark.read.options(inferschema = 'True',delimiter = ',',header = 'True')\
      .csv('Region_State_Mapping.csv')
Sales_Status = spark.read.options(inferschema = 'True',delimiter = ',',header = 'True')\
      .csv('Sales_Status.csv')
State_Master = spark.read.options(inferschema = 'True',delimiter = ',',header = 'True')\
      .csv('State_Master.csv')

In [6]:
Postal_Code_Master.rdd.getNumPartitions()

1

In [7]:
Region_Master.printSchema()

root
 |-- Region_Code: string (nullable = true)
 |-- Region: string (nullable = true)



## Joining data

In [8]:
Txn = Txn.join(City_Master,"City_Code","left").join(Region_State_Mapping,"State_Code","left")\
   .join(Sales_Status,"Sales_ID","left").join(State_Master,"State_Code","left")\
   .join(Region_Master,"Region_Code","left")

In [9]:
Txn.printSchema()

root
 |-- Region_Code: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Sales_ID: integer (nullable = true)
 |-- City_Code: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Selling_Price: integer (nullable = true)
 |-- km_Driven: integer (nullable = true)
 |-- Postal_Code: integer (nullable = true)
 |-- Fuel: string (nullable = true)
 |-- Seller_Type: string (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Owner: string (nullable = true)
 |-- Mileage: string (nullable = true)
 |-- Engine: string (nullable = true)
 |-- Max_Power: string (nullable = true)
 |-- Seats: integer (nullable = true)
 |-- City_Name: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Sold: string (nullable = true)
 |-- State_Name: string (nullable = true)
 |-- Region: string (nullable = true)



## Extracting Imp Information From columns

In [10]:
Split_Mileage = pyspark.sql.functions.split(Txn['Mileage']," ")
#Split_Power = Txn.select(split("Max_Power"," ").as("a")).drop("Max_Power")
Split_Power = psf.split(Txn["Max_Power"]," ")
Split_Engine = psf.split(Txn["Engine"]," ")
Split_Name = psf.split(Txn['Name']," ")

In [11]:
Txn1 = Txn.withColumn('Mileage',Split_Mileage.getItem(0).cast("integer"))\
        .withColumn('Max_Power',Split_Power.getItem(0).cast("integer"))\
        .withColumn('Engine',Split_Engine.getItem(0).cast("integer"))\
        .withColumn('Brand',Split_Name.getItem(0))\
        .withColumn('Model',Split_Name.getItem(1))

## Dropping Unnecessary columns

In [12]:
Txn2 = Txn1.drop('Region_Code').drop('State_Code').drop('City_Code').drop('Name')

In [13]:
Txn2.printSchema()

root
 |-- Sales_ID: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Selling_Price: integer (nullable = true)
 |-- km_Driven: integer (nullable = true)
 |-- Postal_Code: integer (nullable = true)
 |-- Fuel: string (nullable = true)
 |-- Seller_Type: string (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Owner: string (nullable = true)
 |-- Mileage: integer (nullable = true)
 |-- Engine: integer (nullable = true)
 |-- Max_Power: integer (nullable = true)
 |-- Seats: integer (nullable = true)
 |-- City_Name: string (nullable = true)
 |-- Sold: string (nullable = true)
 |-- State_Name: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Model: string (nullable = true)



In [14]:
Txn2.select('Sold').show()

+----+
|Sold|
+----+
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
|   Y|
+----+
only showing top 20 rows



In [15]:
Txn2 = Txn2.withColumn("Brand", psf.when(Txn2.Brand == "Land","Land Rover").otherwise(Txn2.Brand))

In [16]:
Txn2.select("Brand").where(Txn2.Brand == "Land Rover").show()

+----------+
|     Brand|
+----------+
|Land Rover|
|Land Rover|
|Land Rover|
|Land Rover|
|Land Rover|
|Land Rover|
+----------+



In [22]:
Txn2.write.csv("Azure",header = True)

Py4JJavaError: An error occurred while calling o166.csv.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:209)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:851)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:343)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:344)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:901)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 22 more


## filtering Mileage=0

In [23]:
Txn3 = Txn2.filter(Txn2["Mileage"] != 0)

In [24]:
Txn3.count()

7889

## Segregating Sold and unsold cars

In [25]:
sold = Txn2.filter(Txn2['Sold'] == 'Y')
un_sold = Txn2.filter(Txn2['Sold'] == 'N')

## Data sets

In [None]:
# Txn2 = Cleaned Master Data

In [None]:
# Txn3 = Data for mileage analysis (filtered Txn2 with Condition Mileage != 0)

In [None]:
# sold = Data of sold cars from Txn2
# un_sold = Data of Unsold Cars from Txn2

In [29]:
Txn2.rdd.getNumPartitions()

1

## Q3 Brand Wise sales

In [209]:
Brand_Wise = sold.groupby("Brand").agg(count("Sales_ID").alias("Count"),avg("Selling_Price").alias("Avg_Selling_Price"))\
     .sort(psf.desc("Count"))

In [210]:
Brand_Wise.show()

+-------------+-----+------------------+
|        Brand|Count| Avg_Selling_Price|
+-------------+-----+------------------+
|       Maruti|  597| 405231.9631490787|
|      Hyundai|  332|          463259.0|
|     Mahindra|  201| 558303.4278606966|
|         Tata|  178|  369480.297752809|
|       Toyota|  126| 1085555.507936508|
|        Honda|  124| 582581.8225806452|
|         Ford|   92| 562228.2282608695|
|      Renault|   56|419035.66071428574|
|    Chevrolet|   56|279499.96428571426|
|   Volkswagen|   39| 492487.1538461539|
|       Nissan|   33|420969.63636363635|
|          BMW|   29| 4606551.724137931|
|       Jaguar|   21|2917857.1428571427|
|        Skoda|   20|          747749.9|
|        Volvo|   19|3547368.4210526315|
|Mercedes-Benz|   16|         2767187.5|
|       Datsun|   16|       297249.9375|
|        Lexus|   11|         5150000.0|
|         Jeep|    9|2551666.5555555555|
|         Audi|    9|2637777.6666666665|
+-------------+-----+------------------+
only showing top

## Q4 Location Wise

In [207]:
Location_Wise = sold.groupby("Region","State_Name").agg(count("Sales_ID").alias("Count")).sort(psf.desc("Count"))

In [208]:
Location_Wise.show()

+----------+--------------+-----+
|    Region|    State_Name|Count|
+----------+--------------+-----+
|     South|    Tamil Nadu|  230|
|     South|     Karnataka|  223|
|      West|       Gujarat|  208|
|      West|   Maharashtra|  189|
|   Central| Uttar Pradesh|  145|
|     South|        Kerala|  130|
|      East|     Jharkhand|  127|
|     North|         Delhi|   80|
|     South|     Telangana|   72|
|   Central|   Uttarakhand|   70|
|   Central|Madhya Pradesh|   69|
|      East|   West Bengal|   69|
|North East|        Sikkim|   69|
|      East|         Bihar|   69|
|     South|Andhra Pradesh|   68|
|      East|        Odisha|   64|
|     North|        Punjab|   63|
|     North|     Rajasthan|   55|
+----------+--------------+-----+

