In [1]:
#Libraries Required
import os

import numpy as np  
import pandas as pd 

pd.set_option("display.max_rows", 2000)
pd.set_option("display.max_columns", 100)
import matplotlib.pyplot as plt
import seaborn as sns   
import pickle 

import warnings
warnings.filterwarnings("ignore")

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

# Start

In [24]:
spark

In [131]:
%ls

 Volume in drive C is OSDisk
 Volume Serial Number is D86A-C798

 Directory of c:\Users\akash.mathur\Downloads\Akash\Spark

08/14/2022  05:27 PM    <DIR>          .
08/14/2022  05:27 PM    <DIR>          ..
08/10/2022  07:56 PM         3,019,965 canada_forecast_NEW.parquet
08/12/2022  05:16 PM        65,699,395 LoanStats_2018Q4.csv
08/14/2022  05:27 PM         1,853,425 spark.docx
08/14/2022  05:26 PM            82,036 spark_practice.ipynb
               4 File(s)     70,654,821 bytes
               2 Dir(s)  24,078,413,824 bytes free


## Read Data

In [9]:
file_location = "LoanStats_2018Q4.csv"
file_type = "csv"

infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# df = spark.read.format('csv')\
#     .option("inferSchema", infer_schema)\
#     .option("header", first_row_is_header)\
#     .option("sep", delimiter)\
#     .load(file_location)
    
df = spark.read.format('csv')\
    .options(inferSchema= infer_schema,
    header= first_row_is_header,
    sep= delimiter)\
    .load(file_location)

# ALTERNATE
# spark.read.load("LoanStats_2018Q4.csv",
#                      format="csv", sep=";", inferSchema="true", header="true")

# spark.read.csv("LoanStats_2018Q4.csv",
#                      inferSchema="true", header="true")

In [5]:
display(df)

DataFrame[loan_amnt: int, funded_amnt: int, funded_amnt_inv: int, term: string, int_rate: string, installment: double, grade: string, sub_grade: string, emp_title: string, emp_length: string, home_ownership: string, annual_inc: int, verification_status: string, issue_d: string, loan_status: string, pymnt_plan: string]

In [6]:
df.count()

102

In [7]:
df.printSchema()

root
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: integer (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)



## Cached in memory (Persists the DataFrame with the default storage level (MEMORY_AND_DISK)

In [None]:
df1 = spark.read.format('csv')\
    .option("inferSchema", infer_schema)\
    .option("header", first_row_is_header)\
    .option("sep", delimiter)\
    .load(file_location).cache()

#OR
    
df1.cache()

## Enabling for Conversion to/from Pandas

Not recommended, because you will lose out on memory. If you need to just sample a few rows and convert to pandas

In [10]:
# import numpy as np
# import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas() #OR
df_pa = df.toPandas()

## Copy a DF

In [175]:
df1 = df.alias('df1')

In [11]:
# only applicable for Databricks
display(df)

DataFrame[loan_amnt: int, funded_amnt: int, funded_amnt_inv: int, term: string, int_rate: string, installment: double, grade: string, sub_grade: string, emp_title: string, emp_length: string, home_ownership: string, annual_inc: int, verification_status: string, issue_d: string, loan_status: string, pymnt_plan: string]

In [12]:
df.limit(5)

DataFrame[loan_amnt: int, funded_amnt: int, funded_amnt_inv: int, term: string, int_rate: string, installment: double, grade: string, sub_grade: string, emp_title: string, emp_length: string, home_ownership: string, annual_inc: int, verification_status: string, issue_d: string, loan_status: string, pymnt_plan: string]

In [18]:
df.head()

Row(loan_amnt=10000, funded_amnt=10000, funded_amnt_inv=10000, term=' 36 months', int_rate='10.33%', installment=324.23, grade='B', sub_grade='B1', emp_title=None, emp_length='< 1 year', home_ownership='MORTGAGE', annual_inc=280000, verification_status='Not Verified', issue_d='18-Dec', loan_status='Current', pymnt_plan='n')

In [19]:
df.count()

102

## Check DataTypes

In [20]:
df.dtypes

[('loan_amnt', 'int'),
 ('funded_amnt', 'int'),
 ('funded_amnt_inv', 'int'),
 ('term', 'string'),
 ('int_rate', 'string'),
 ('installment', 'double'),
 ('grade', 'string'),
 ('sub_grade', 'string'),
 ('emp_title', 'string'),
 ('emp_length', 'string'),
 ('home_ownership', 'string'),
 ('annual_inc', 'int'),
 ('verification_status', 'string'),
 ('issue_d', 'string'),
 ('loan_status', 'string'),
 ('pymnt_plan', 'string')]

In [21]:
df.printSchema()

root
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: integer (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)



## Select columns

In [22]:
## Collect (Returns all the records as a list of Row)

df.select(['emp_title', 'grade', 'sub_grade']).limit(2).collect()

[Row(emp_title=None, grade='B', sub_grade='B1'),
 Row(emp_title='Chef', grade='C', sub_grade='C1')]

In [29]:
df.select(['emp_title', 'grade', 'sub_grade']).show(2)

+---------+-----+---------+
|emp_title|grade|sub_grade|
+---------+-----+---------+
|     null|    B|       B1|
|     Chef|    C|       C1|
+---------+-----+---------+
only showing top 2 rows



In [30]:
df.select([col('emp_title'), col('grade'), col('sub_grade')]).show(2)

+---------+-----+---------+
|emp_title|grade|sub_grade|
+---------+-----+---------+
|     null|    B|       B1|
|     Chef|    C|       C1|
+---------+-----+---------+
only showing top 2 rows



In [39]:
# df['emp_title']#.show(2)
type(df['emp_title'])

pyspark.sql.column.Column

## Show Top N Rows in Spark/PySpark

### show()

In [70]:
# show() -> Print/Show top 20 rows in a tabular form
# show(n) -> Print/Show top N rows in a tabular form
df.show(2)

+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|    10000|      10000|          10000| 36 months|  10.33%|     324.23|    B|       B1|     null|  < 1 year|      MORTGAGE|    280000|       Not Verified| 18-Dec|    Current|         n|
|     2500|       2500|           2500| 36 months|  13.56%|      84.92|    C|       C1|     Chef| 10+ years|          RENT|     55000|       Not Verified| 18-Dec|    Current|         n|
+---------+-----------+---------------+----------+--------+-----------

### take/first

In [76]:
# take() Returns top N row.	PySpark – Return list of Row **ACTION**
# first() Returns the first row	PySpark – Return list of Row **ACTION**

# head()	Returns the first row	PySpark – Return list of Row **ACTION**
# head(n)	Returns top N rows PySpark – Return list of Row **ACTION**
# tail(n)	Returns Last N rows	PySpark – Return list of class Row **ACTION**

# collect()	Returns all dataset	PySpark – Return All as a list of Row **ACTION**

# limit(n)	Returns Top N rows	PySpark – Returns a new DataFrame **Transformation**

# Note: take(), first(), tail() and head() actions internally calls limit() transformation and finally calls collect() action to collect the data.

In [73]:
df.take(1)

[Row(loan_amnt=10000, funded_amnt=10000, funded_amnt_inv=10000, term=' 36 months', int_rate='10.33%', installment=324.23, grade='B', sub_grade='B1', emp_title=None, emp_length='< 1 year', home_ownership='MORTGAGE', annual_inc=280000, verification_status='Not Verified', issue_d='18-Dec', loan_status='Current', pymnt_plan='n')]

In [74]:
df.first()

Row(loan_amnt=10000, funded_amnt=10000, funded_amnt_inv=10000, term=' 36 months', int_rate='10.33%', installment=324.23, grade='B', sub_grade='B1', emp_title=None, emp_length='< 1 year', home_ownership='MORTGAGE', annual_inc=280000, verification_status='Not Verified', issue_d='18-Dec', loan_status='Current', pymnt_plan='n')

In [75]:
df.head(1)

[Row(loan_amnt=10000, funded_amnt=10000, funded_amnt_inv=10000, term=' 36 months', int_rate='10.33%', installment=324.23, grade='B', sub_grade='B1', emp_title=None, emp_length='< 1 year', home_ownership='MORTGAGE', annual_inc=280000, verification_status='Not Verified', issue_d='18-Dec', loan_status='Current', pymnt_plan='n')]

In [79]:
df.tail(1)

[Row(loan_amnt=22000, funded_amnt=22000, funded_amnt_inv=22000, term=' 60 months', int_rate='11.80%', installment=487.16, grade='B', sub_grade='B4', emp_title='Corporate Communications Strategist', emp_length='10+ years', home_ownership='MORTGAGE', annual_inc=155000, verification_status='Source Verified', issue_d='18-Dec', loan_status='Current', pymnt_plan='n')]

In [80]:
df.limit(1)

DataFrame[loan_amnt: int, funded_amnt: int, funded_amnt_inv: int, term: string, int_rate: string, installment: double, grade: string, sub_grade: string, emp_title: string, emp_length: string, home_ownership: string, annual_inc: int, verification_status: string, issue_d: string, loan_status: string, pymnt_plan: string]

In [81]:
df.limit(1).collect()

[Row(loan_amnt=10000, funded_amnt=10000, funded_amnt_inv=10000, term=' 36 months', int_rate='10.33%', installment=324.23, grade='B', sub_grade='B1', emp_title=None, emp_length='< 1 year', home_ownership='MORTGAGE', annual_inc=280000, verification_status='Not Verified', issue_d='18-Dec', loan_status='Current', pymnt_plan='n')]

In [83]:
df.limit(1).head()

Row(loan_amnt=10000, funded_amnt=10000, funded_amnt_inv=10000, term=' 36 months', int_rate='10.33%', installment=324.23, grade='B', sub_grade='B1', emp_title=None, emp_length='< 1 year', home_ownership='MORTGAGE', annual_inc=280000, verification_status='Not Verified', issue_d='18-Dec', loan_status='Current', pymnt_plan='n')

In [85]:
df.limit(1).show()

+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|    10000|      10000|          10000| 36 months|  10.33%|     324.23|    B|       B1|     null|  < 1 year|      MORTGAGE|    280000|       Not Verified| 18-Dec|    Current|         n|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+



In [90]:
df.select('emp_title').show(5)

+--------------------+
|           emp_title|
+--------------------+
|                null|
|                Chef|
|                null|
|                null|
|Instructional Coo...|
+--------------------+
only showing top 5 rows



In [87]:
df.select('emp_title').limit(2).show()

+---------+
|emp_title|
+---------+
|     null|
|     Chef|
+---------+



In [93]:
df.select('emp_title').take(1)

[Row(emp_title=None)]

In [94]:
type(df.select('emp_title'))

pyspark.sql.dataframe.DataFrame

In [40]:
# Returns the list of Row objects
df.head(2)

[Row(loan_amnt=10000, funded_amnt=10000, funded_amnt_inv=10000, term=' 36 months', int_rate='10.33%', installment=324.23, grade='B', sub_grade='B1', emp_title=None, emp_length='< 1 year', home_ownership='MORTGAGE', annual_inc=280000, verification_status='Not Verified', issue_d='18-Dec', loan_status='Current', pymnt_plan='n'),
 Row(loan_amnt=2500, funded_amnt=2500, funded_amnt_inv=2500, term=' 36 months', int_rate='13.56%', installment=84.92, grade='C', sub_grade='C1', emp_title='Chef', emp_length='10+ years', home_ownership='RENT', annual_inc=55000, verification_status='Not Verified', issue_d='18-Dec', loan_status='Current', pymnt_plan='n')]

In [95]:
df.select('emp_title').distinct().show()

+--------------------+
|           emp_title|
+--------------------+
|        UPS/Shipping|
|Corporate Communi...|
|  Nursing Supervisor|
| Sale Representative|
|      Administrative|
|         Casino Host|
|Operations Coordi...|
|Help Tech Supervisor|
|Program Support A...|
|            Security|
|            OPERATOR|
|            Operator|
|           Carpenter|
|              Driver|
|             Teacher|
|Instructional Coo...|
|                Chef|
|                null|
|              BANKER|
|       Asst. Manager|
+--------------------+
only showing top 20 rows



In [96]:
df.select('emp_title').distinct().show(truncate=False)

+-----------------------------------+
|emp_title                          |
+-----------------------------------+
|UPS/Shipping                       |
|Corporate Communications Strategist|
|Nursing Supervisor                 |
|Sale Representative                |
|Administrative                     |
|Casino Host                        |
|Operations Coordinator             |
|Help Tech Supervisor               |
|Program Support Assistant          |
|Security                           |
|OPERATOR                           |
|Operator                           |
|Carpenter                          |
|Driver                             |
|Teacher                            |
|Instructional Coordinator          |
|Chef                               |
|null                               |
|BANKER                             |
|Asst. Manager                      |
+-----------------------------------+
only showing top 20 rows



In [100]:
df.select('*')#.show()

DataFrame[loan_amnt: int, funded_amnt: int, funded_amnt_inv: int, term: string, int_rate: string, installment: double, grade: string, sub_grade: string, emp_title: string, emp_length: string, home_ownership: string, annual_inc: int, verification_status: string, issue_d: string, loan_status: string, pymnt_plan: string]

## Rename columns

In [98]:
df.groupBy('emp_title').count().withColumnRenamed('count','total_count').orderBy('total_count', ascending=False).show(truncate=False)

+------------------+-----------+
|emp_title         |total_count|
+------------------+-----------+
|null              |20950      |
|Teacher           |2090       |
|Manager           |1773       |
|Registered Nurse  |952        |
|Driver            |924        |
|RN                |726        |
|Supervisor        |697        |
|Sales             |580        |
|Project Manager   |526        |
|General Manager   |523        |
|Office Manager    |521        |
|Owner             |420        |
|Director          |402        |
|Operations Manager|387        |
|Truck Driver      |387        |
|Nurse             |326        |
|Engineer          |325        |
|Sales Manager     |304        |
|manager           |301        |
|Supervisor        |270        |
+------------------+-----------+
only showing top 20 rows



In [None]:
# returns a new DF with updated columns
df.toDF('f1', 'f2', 'f3')

# update individual columns
df.withColumnRenamed('old','new')

# using selectExpr
df.selectExpr("old1 as new1", "old2 as new2", "old3 as new3")

# using selectExpr
df.select(col("old1").alias("new1"), col("old2").alias("new2"))

## Drop a column

In [None]:
## No axis column in pyspark
df.drop('column')

## Filtering

In [61]:
df[(df.settlement_percentage >0.5) & (df.settlement_amount==100)]

DataFrame[id: string, member_id: string, loan_amnt: int, funded_amnt: int, funded_amnt_inv: double, term: string, int_rate: string, installment: double, grade: string, sub_grade: string, emp_title: string, emp_length: string, home_ownership: string, annual_inc: double, verification_status: string, issue_d: string, loan_status: string, pymnt_plan: string, url: string, desc: string, purpose: string, title: string, zip_code: string, addr_state: string, dti: double, delinq_2yrs: int, earliest_cr_line: string, inq_last_6mths: int, mths_since_last_delinq: int, mths_since_last_record: int, open_acc: int, pub_rec: int, revol_bal: int, revol_util: string, total_acc: int, initial_list_status: string, out_prncp: double, out_prncp_inv: double, total_pymnt: double, total_pymnt_inv: double, total_rec_prncp: double, total_rec_int: double, total_rec_late_fee: double, recoveries: double, collection_recovery_fee: double, last_pymnt_d: string, last_pymnt_amnt: double, next_pymnt_d: string, last_credit_pu

## Add columns

In pandas - division by 0 gives infinity

In spark - gives null, SQL like

In [104]:
# it will create a new DF, wont change the existing DF, because these are immutable
df.withColumn('new_loan_amnt', (df.loan_amnt *0.5)).show(2)

+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+-------------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|new_loan_amnt|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+-------------+
|    10000|      10000|          10000| 36 months|  10.33%|     324.23|    B|       B1|     null|  < 1 year|      MORTGAGE|    280000|       Not Verified| 18-Dec|    Current|         n|       5000.0|
|     2500|       2500|           2500| 36 months|  13.56%|      84.92|    C|       C1|     Chef| 10+ years|          RENT|     55000|       Not Verified| 18-Dec|    Current|         n|       1250.0|


# Handle Missing Values

In PySpark DataFrame you can calculate the count of Null, None, NaN or Empty/Blank values in a column by using isNull() of Column class & SQL functions isnan() count() and when(). In this article, I will explain how to get the count of Null, None, NaN, empty or blank values from all or multiple selected columns of PySpark DataFrame.

Note: In Python None is equal to null value, son on PySpark DataFrame None values are shown as null.

In [121]:
df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns]).show()

+---------+-----------+---------------+----+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|term|int_rate|installment|grade|sub_grade|emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|        0|          0|              0|   0|       0|          0|    0|        0|       20|         0|             0|         0|                  0|      0|          0|         0|
+---------+-----------+---------------+----+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+



In [124]:
df.filter(df['emp_title'].isNull()).show(2)

+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|    10000|      10000|          10000| 36 months|  10.33%|     324.23|    B|       B1|     null|  < 1 year|      MORTGAGE|    280000|       Not Verified| 18-Dec|    Current|         n|
|    12000|      12000|          12000| 60 months|  13.56%|     276.49|    C|       C1|     null|  < 1 year|      MORTGAGE|     40000|       Not Verified| 18-Dec|    Current|         n|
+---------+-----------+---------------+----------+--------+-----------

In [126]:
df.filter(isnan('emp_title')).show(2)

+---------+-----------+---------------+----+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|term|int_rate|installment|grade|sub_grade|emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
+---------+-----------+---------------+----+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+



In [None]:
# # how=any, by default, Will delete all rows where there is atleast 1 Null 
df.na.drop()

# how=all, Will delete all rows where ALL cols are Null 
df.na.drop(how="all")

# atleast 2 Non-Null values should be present across cols, else it will delete the row
df.na.drop(how="any", thresh=2)

# whereever there is Null values across subset col, it will delete
df.na.drop(how="any", subset=["member_id"])

DataFrame[id: string, member_id: string, loan_amnt: int, funded_amnt: int, funded_amnt_inv: double, term: string, int_rate: string, installment: double, grade: string, sub_grade: string, emp_title: string, emp_length: string, home_ownership: string, annual_inc: double, verification_status: string, issue_d: string, loan_status: string, pymnt_plan: string, url: string, desc: string, purpose: string, title: string, zip_code: string, addr_state: string, dti: double, delinq_2yrs: int, earliest_cr_line: string, inq_last_6mths: int, mths_since_last_delinq: int, mths_since_last_record: int, open_acc: int, pub_rec: int, revol_bal: int, revol_util: string, total_acc: int, initial_list_status: string, out_prncp: double, out_prncp_inv: double, total_pymnt: double, total_pymnt_inv: double, total_rec_prncp: double, total_rec_int: double, total_rec_late_fee: double, recoveries: double, collection_recovery_fee: double, last_pymnt_d: string, last_pymnt_amnt: double, next_pymnt_d: string, last_credit_pu

## Fill Nulls

Additional options in pandas, wrt pyspark

In [None]:
df.fillna(0)

In [165]:
df.na.fill("Missing", subset=['id','member_id']).select(['id','member_id']).show()

+-------+---------+
|     id|member_id|
+-------+---------+
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
|Missing|  Missing|
+-------+---------+
only showing top 20 rows



In [None]:
# another dict way 
df.na.fill({"id":"Missing", "member_id":0}).select(['id','member_id']).show()

In [183]:
df.corr('settlement_amount','funded_amnt_inv')

-0.0034251235906457385

# Standard Transformations

You can use numpy, but it will be slow. Use the built in functions

In [127]:
df.columns

['loan_amnt',
 'funded_amnt',
 'funded_amnt_inv',
 'term',
 'int_rate',
 'installment',
 'grade',
 'sub_grade',
 'emp_title',
 'emp_length',
 'home_ownership',
 'annual_inc',
 'verification_status',
 'issue_d',
 'loan_status',
 'pymnt_plan']

In [63]:
import pyspark.sql.functions as F

In [128]:
df.select("grade").show(5)

+-----+
|grade|
+-----+
|    B|
|    C|
|    C|
|    C|
|    D|
+-----+
only showing top 5 rows



In [133]:
#withColumn is a TRANSFORMATION

df1 = df.withColumn('new_loan_amnt', (col('loan_amnt')*0.1))
df1.show(1)

+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+-------------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|new_loan_amnt|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+-------------+
|    10000|      10000|          10000| 36 months|  10.33%|     324.23|    B|       B1|     null|  < 1 year|      MORTGAGE|    280000|       Not Verified| 18-Dec|    Current|         n|       1000.0|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+-------------+


In [135]:
df.withColumn('new_loan_amnt', (col('loan_amnt')*0.1))

DataFrame[loan_amnt: int, funded_amnt: int, funded_amnt_inv: int, term: string, int_rate: string, installment: double, grade: string, sub_grade: string, emp_title: string, emp_length: string, home_ownership: string, annual_inc: int, verification_status: string, issue_d: string, loan_status: string, pymnt_plan: string, new_loan_amnt: double]

In [134]:
df.withColumn('grade', col('grade').cast("string"))

DataFrame[loan_amnt: int, funded_amnt: int, funded_amnt_inv: int, term: string, int_rate: string, installment: double, grade: string, sub_grade: string, emp_title: string, emp_length: string, home_ownership: string, annual_inc: int, verification_status: string, issue_d: string, loan_status: string, pymnt_plan: string]

In [137]:
df.select("loan_amnt", col("funded_amnt").alias("fa")).show(1)

+---------+-----+
|loan_amnt|   fa|
+---------+-----+
|    10000|10000|
+---------+-----+
only showing top 1 row



## Filter & where

In [143]:
df.filter(col("grade")=="D").show(2)

+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|           emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+
|    16000|      16000|          16000| 60 months|  17.97%|     406.04|    D|       D1|Instructional Coo...|   5 years|      MORTGAGE|     51000|       Not Verified| 18-Dec|    Current|         n|
|     3500|       3500|           3500| 36 months|  20.89%|     131.67|    D|       D4|       gas attendant| 10+ years|      MORTGAGE|     40000|    Source Verified| 18-Dec|    Current|         n|
+---------+----

In [156]:
df.where(col("grade")=="D").show(2)

+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|           emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+
|    16000|      16000|          16000| 60 months|  17.97%|     406.04|    D|       D1|Instructional Coo...|   5 years|      MORTGAGE|     51000|       Not Verified| 18-Dec|    Current|         n|
|     3500|       3500|           3500| 36 months|  20.89%|     131.67|    D|       D4|       gas attendant| 10+ years|      MORTGAGE|     40000|    Source Verified| 18-Dec|    Current|         n|
+---------+----

In [147]:
df.filter((col("grade")=="D") & (col("loan_amnt")>10000)).show(2)

+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|           emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+
|    16000|      16000|          16000| 60 months|  17.97%|     406.04|    D|       D1|Instructional Coo...|   5 years|      MORTGAGE|     51000|       Not Verified| 18-Dec|    Current|         n|
|    30000|      30000|          30000| 60 months|  18.94%|     777.23|    D|       D2|         Postmaster | 10+ years|      MORTGAGE|     90000|    Source Verified| 18-Dec|    Current|         n|
+---------+----

In [157]:
df.where((col("grade")=="D") & (col("loan_amnt")>10000)).show(2)

+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|           emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+-------+-----------+----------+
|    16000|      16000|          16000| 60 months|  17.97%|     406.04|    D|       D1|Instructional Coo...|   5 years|      MORTGAGE|     51000|       Not Verified| 18-Dec|    Current|         n|
|    30000|      30000|          30000| 60 months|  18.94%|     777.23|    D|       D2|         Postmaster | 10+ years|      MORTGAGE|     90000|    Source Verified| 18-Dec|    Current|         n|
+---------+----

In [148]:
df.filter(col("grade").isin(["A","B"])).show(1)

+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|    10000|      10000|          10000| 36 months|  10.33%|     324.23|    B|       B1|     null|  < 1 year|      MORTGAGE|    280000|       Not Verified| 18-Dec|    Current|         n|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
only showing top 1 row



In [158]:
df.where(col("grade").isin(["A","B"])).show(1)

+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|    10000|      10000|          10000| 36 months|  10.33%|     324.23|    B|       B1|     null|  < 1 year|      MORTGAGE|    280000|       Not Verified| 18-Dec|    Current|         n|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
only showing top 1 row



## Group by

In [151]:
df.groupBy("grade").sum('loan_amnt').show()

+-----+--------------+
|grade|sum(loan_amnt)|
+-----+--------------+
|    E|        132350|
|    B|        478550|
|    D|        313025|
|    C|        551400|
|    A|         45000|
+-----+--------------+



In [155]:
df.groupBy("grade").agg(sum('loan_amnt').alias('sum_LA')).sort(col('sum_LA').asc()).show()

+-----+------+
|grade|sum_LA|
+-----+------+
|    A| 45000|
|    E|132350|
|    D|313025|
|    B|478550|
|    C|551400|
+-----+------+



## Merge/Join DFs

In [None]:
df_as1 = df.alias("df_as1")
df_as2 = df.alias("df_as2")
joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").sort(desc("df_as1.name")).collect()

## Union

In [162]:
df_as1.union(df_as2)

## Case Function (When.Otherwise )

In [169]:
# Multiple conditions on multiple columns

df.withColumn("loan_level", when(col("loan_amnt") < 1000, "Low")\
                            .when((col("loan_amnt") > 1000) & (col("funded_amnt") > 20000), "Mid")\
                            .otherwise("High"))\
                            .select("loan_amnt","loan_level").show(5)

+---------+----------+
|loan_amnt|loan_level|
+---------+----------+
|    10000|      High|
|     2500|      High|
|    12000|      High|
|    15000|      High|
|    16000|      High|
+---------+----------+
only showing top 5 rows



In [167]:
df.columns

['loan_amnt',
 'funded_amnt',
 'funded_amnt_inv',
 'term',
 'int_rate',
 'installment',
 'grade',
 'sub_grade',
 'emp_title',
 'emp_length',
 'home_ownership',
 'annual_inc',
 'verification_status',
 'issue_d',
 'loan_status',
 'pymnt_plan']

## Pivot table

In [171]:
df.columns

['loan_amnt',
 'funded_amnt',
 'funded_amnt_inv',
 'term',
 'int_rate',
 'installment',
 'grade',
 'sub_grade',
 'emp_title',
 'emp_length',
 'home_ownership',
 'annual_inc',
 'verification_status',
 'issue_d',
 'loan_status',
 'pymnt_plan']

In [172]:
df.groupBy('grade').pivot('term').sum('loan_amnt').show()

+-----+----------+----------+
|grade| 36 months| 60 months|
+-----+----------+----------+
|    E|     52350|     80000|
|    B|    194575|    283975|
|    D|     47950|    265075|
|    C|    248000|    303400|
|    A|     45000|      null|
+-----+----------+----------+



In [178]:
df_p = df.toPandas()
df_p.groupby(["grade","term"])["loan_amnt"].sum().unstack()

term,36 months,60 months
grade,Unnamed: 1_level_1,Unnamed: 2_level_1
A,45000.0,
B,194575.0,283975.0
C,248000.0,303400.0
D,47950.0,265075.0
E,52350.0,80000.0


## Summary Statistics

In [81]:
# df.describe().show()

## SQL Support
First register your DF as a table, If we exit, the table will be lost. To save permanently, use save options.

In [None]:
# create a temporory table, result is a DF
df1.createOrReplaceTempView('table_name')
df2 = spark.sql('select * from tablename')

## Timestamp

In [101]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: doubl

In [116]:
df.select('hardship_start_date').filter(df.hardship_start_date == 'Feb-19').show()

+-------------------+
|hardship_start_date|
+-------------------+
|             Feb-19|
+-------------------+



In [115]:
df.select('hardship_start_date', to_date(df.hardship_start_date, 'mm-yy').alias('settlement_TS')).distinct().show()

+-------------------+-------------+
|hardship_start_date|settlement_TS|
+-------------------+-------------+
|               null|         null|
|             Feb-19|         null|
+-------------------+-------------+



## Window Functions

In [191]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("grade").orderBy(col("loan_amnt").desc())

In [194]:
df.select("grade","loan_amnt").withColumn("rank", dense_rank().over(windowSpec)).sort(asc("grade")).filter(col("rank")<3).show()

+-----+---------+----+
|grade|loan_amnt|rank|
+-----+---------+----+
|    A|    18000|   2|
|    A|    20000|   1|
|    B|    35000|   2|
|    B|    40000|   1|
|    C|    40000|   1|
|    C|    35000|   2|
|    D|    35000|   1|
|    D|    35000|   1|
|    D|    34575|   2|
|    D|    35000|   1|
|    E|    30000|   1|
|    E|    24000|   2|
+-----+---------+----+



In [190]:
df.show(1)

+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|loan_amnt|funded_amnt|funded_amnt_inv|      term|int_rate|installment|grade|sub_grade|emp_title|emp_length|home_ownership|annual_inc|verification_status|issue_d|loan_status|pymnt_plan|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
|    10000|      10000|          10000| 36 months|  10.33%|     324.23|    B|       B1|     null|  < 1 year|      MORTGAGE|    280000|       Not Verified| 18-Dec|    Current|         n|
+---------+-----------+---------------+----------+--------+-----------+-----+---------+---------+----------+--------------+----------+-------------------+-------+-----------+----------+
only showing top 1 row



## UDF (User Defined Function)

In [151]:
lowr = udf(lambda x : x.lower())

## Memory, Caching, Partitions

In [None]:
spark.catalog.cacheTable("tableName")
dataFrame.cache()
spark.catalog.uncacheTable("tableName")

In [117]:
# There are 8 partitions 
df.rdd.getNumPartitions()

8

In [118]:
df.repartition(6).createOrReplaceTempView("df_repartitioned")

In [119]:
spark.catalog.cacheTable("df_repartitioned")

In [122]:
spark.sql('select distinct hardship_start_date from df_repartitioned').show()

+-------------------+
|hardship_start_date|
+-------------------+
|               null|
|             Feb-19|
+-------------------+



In [124]:
new_df = spark.table("df_repartitioned")

In [125]:
new_df.count()

128416

In [127]:
spark.catalog.isCached("df_repartitioned")

True

1. It will read teh data from S3 or fro many source, cache that into the memory for further use by reducing the size significantly

2. Repartitiong helps us to reduce the DF size which is in GBs to MBs, because it compresses them columnar wise. 

3. Try to have each partition size to be in between 50-200MB, dont have multi KB or multi GB partition for better performance

Reference - https://youtu.be/K14plpZgy_c?list=PLr-OUJWJfNGEggwo6Vt5LZGRMR1yNyUvg&t=3399

## Coalesce

Returns a new DataFrame that has exactly numPartitions partitions.

Similar to coalesce defined on an RDD, this operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions is requested, it will stay at the current number of partitions.

However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition(). This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

In [None]:
dataframe.coalesce(1).rdd.getNumPartitions()

## Write to parquet

In [None]:
df.write.format('parquet').save('/folder/df_parquet')

In [42]:
# df.filter("grade='A'").select("grade").show()
tw_filter = df.filter(df.grade=='A').select("grade").show()

+-----+
|grade|
+-----+
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
|    A|
+-----+
only showing top 20 rows



In [43]:
tw_filter.explain()

AttributeError: 'NoneType' object has no attribute 'explain'

In [124]:
# Run SQL on files directly
spark.sql("select _c0, _c1 from csv. `LoanStats_2018Q4.csv`").show()

+----+---------+
| _c0|      _c1|
+----+---------+
|  id|member_id|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
+----+---------+
only showing top 20 rows



In [34]:
# for c in df.columns:
#     df = df.withColumnRenamed(c, c.replace(" ","_")) \
#         .withColumnRenamed(c, c.replace(".",""))
        
# df = df.withColumnRenamed("Engine_(ltr)", "Engine_ltr")

In [115]:
spark.sql("select disbursement_method from loan_status").show(5)

+-------------------+
|disbursement_method|
+-------------------+
|          DirectPay|
|               Cash|
|               Cash|
|               Cash|
|               Cash|
+-------------------+
only showing top 5 rows



In [20]:
df.groupBy("addr_state")\
    .agg({"addr_state":'count'})\
    .withColumnRenamed("count(addr_state)", "avg_income")\
    .show()

+----------+----------+
|addr_state|avg_income|
+----------+----------+
|        SC|      1692|
|        AZ|      3149|
|        LA|      1328|
|        MN|      2078|
|        NJ|      4711|
|        DC|       274|
|        OR|      1562|
|        VA|      3364|
|      null|         0|
|        RI|       614|
|        KY|      1280|
|        WY|       241|
|        NH|       643|
|        MI|      3249|
|        NV|      2006|
|        WI|      1729|
|        ID|       501|
|        CA|     17879|
|        NE|       642|
|        CT|      2038|
+----------+----------+
only showing top 20 rows



In [28]:
sorted(df.columns)

['acc_now_delinq',
 'acc_open_past_24mths',
 'addr_state',
 'all_util',
 'annual_inc',
 'annual_inc_joint',
 'application_type',
 'avg_cur_bal',
 'bc_open_to_buy',
 'bc_util',
 'chargeoff_within_12_mths',
 'collection_recovery_fee',
 'collections_12_mths_ex_med',
 'debt_settlement_flag',
 'debt_settlement_flag_date',
 'deferral_term',
 'delinq_2yrs',
 'delinq_amnt',
 'desc',
 'disbursement_method',
 'dti',
 'dti_joint',
 'earliest_cr_line',
 'emp_length',
 'emp_title',
 'funded_amnt',
 'funded_amnt_inv',
 'grade',
 'hardship_amount',
 'hardship_dpd',
 'hardship_end_date',
 'hardship_flag',
 'hardship_last_payment_amount',
 'hardship_length',
 'hardship_loan_status',
 'hardship_payoff_balance_amount',
 'hardship_reason',
 'hardship_start_date',
 'hardship_status',
 'hardship_type',
 'home_ownership',
 'id',
 'il_util',
 'initial_list_status',
 'inq_fi',
 'inq_last_12m',
 'inq_last_6mths',
 'installment',
 'int_rate',
 'issue_d',
 'last_credit_pull_d',
 'last_pymnt_amnt',
 'last_pymnt_d'

In [52]:
df.stat.cov("Registration_Year", "Scrappage_volume")

60.38840887987951

In [53]:
df.stat.corr("Registration_Year", "Scrappage_volume")

0.10713454005480579

In [61]:
df.groupBy("Make").count().orderBy('count',ascending=False).show()

+-------------+------+
|         Make| count|
+-------------+------+
|    CHEVROLET|127335|
|         FORD|115475|
|          GMC| 81739|
|        DODGE| 72821|
|MERCEDES-BENZ| 47042|
|       TOYOTA| 42135|
|          BMW| 31962|
|       NISSAN| 31257|
|      PONTIAC| 30856|
|   VOLKSWAGEN| 28435|
|         AUDI| 26378|
|   OLDSMOBILE| 24273|
|     CHRYSLER| 23933|
|        BUICK| 23195|
|     CADILLAC| 21132|
|        VOLVO| 20666|
|        MAZDA| 20378|
|         JEEP| 15717|
|        HONDA| 15717|
|      HYUNDAI| 15366|
+-------------+------+
only showing top 20 rows



In [69]:
# df.where(df.Make.isin(["BMW", "VOLVO"])).show()

# df.filter(df.Make == "BMW").show()
# df.groupBy("Make","Scrappage_volume").mean()

## regexp_replace, regexp_extract

# Leetcode

In [25]:
file_location = "c://Users//akash.mathur//Downloads//Akash//Spark/Book1.csv"
file_type = "csv"

infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# df = spark.read.format('csv')\
#     .option("inferSchema", infer_schema)\
#     .option("header", first_row_is_header)\
#     .option("sep", delimiter)\
#     .load(file_location)
    
df = spark.read.format('csv')\
    .options(inferSchema= infer_schema,
    header= first_row_is_header,
    sep= delimiter)\
    .load(file_location)

# ALTERNATE
# spark.read.load("LoanStats_2018Q4.csv",
#                      format="csv", sep=";", inferSchema="true", header="true")

# spark.read.csv("LoanStats_2018Q4.csv",
#                      inferSchema="true", header="true")

AnalysisException: Path does not exist: file:/c:/Users/akash.mathur/Downloads/Akash/Spark/Book1.csv

In [21]:
# data = [
#     (1,1,1),
#     (1,1,1),
#     (1,1,2),
#     (1,2,3),
#     (1,2,4),
#     (2,1,5),
#     (2,1,6)
#     ]

data = [["node.js", "dbms", "integration"],
        ["jsp", "SQL", "trigonometry"],
        ["php", "oracle", "statistics"],
        [".net", "db2", "Machine Learning"]]

columns=['actor_id','director_id','timestamp']

sdf = spark.createDataFrame(data, columns)

In [23]:
sdf.show()

Py4JJavaError: An error occurred while calling o137.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7) (INGGNLT1PYD593.INFO.CORP executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:458)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:458)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 29 more


In [22]:
# sdf.withColumn('actor_id', col('actor_id').cast("Integer"))\
#     .withColumn('director_id', col('director_id').cast("Integer"))\
#     .withColumn('timestamp', col('timestamp').cast("Integer"))\
#     .show()

In [13]:
sdf.printSchema()

root
 |-- actor_id: long (nullable = true)
 |-- director_id: long (nullable = true)
 |-- timestamp: long (nullable = true)



# Germany

In [212]:
import re

In [196]:
germany = spark.read.csv('germany_newreg.csv', inferSchema=True, header=True)
df = germany.cache()

In [247]:
df = df.toDF(*[re.sub("[^a-zA-Z0-9]","",column) for column in df.columns])

used_cols = ["Make","RegistrationType","Model","SubModelShort","BodyType","FuelType","DrivenWheels","NoofDoors","Turbo","Engineltr","Engineccm","EnginekW",
                 "GrossVehicleWeight","NoofCylinders","1980","1981","1982","1983","1984","1985","1986","1987","1988","1989","1990","1991","1992","1993","1994","1995","1996","1997",
                 "1998","1999","2000","2001","2002","2003","2004","2005","2006","2007","2008","2009","2010","2011","2012","2013","2014","2015","2016","2017","2018","2019","2020","2021"]

df = df.select(used_cols)
df = df.filter(df['RegistrationType'] != "Heavy Commercial Vehicles")

In [304]:
def melt_df(
        df: DataFrame,
        id_vars: str, value_vars: str,
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name))
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

In [350]:
df1 = melt_df(df=df, 
        id_vars=["Make","RegistrationType","Model","SubModelShort","BodyType","FuelType","DrivenWheels","NoofDoors","Turbo","Engineltr","Engineccm","EnginekW",
                 "GrossVehicleWeight","NoofCylinders"],
        value_vars=["1980","1981","1982","1983","1984","1985","1986","1987","1988","1989","1990","1991","1992","1993","1994","1995","1996","1997",
                 "1998","1999","2000","2001","2002","2003","2004","2005","2006","2007","2008","2009","2010","2011","2012","2013","2014","2015","2016","2017","2018","2019","2020","2021"],
        
        var_name = 'year',
        value_name = 'sales')       

In [340]:
string_cols = [item[0] for item in df1.dtypes if item[1]=='string']
non_string_cols = [item[0] for item in df1.dtypes if item[1]!='string']

In [351]:
df1 = df1.select(*[F.upper(c).alias(c) for c in string_cols], *non_string_cols)

In [356]:
df1 = df1.withColumn('year', df1['year'].cast(IntegerType()))
df1 = df1.filter(df1.sales>0)

In [561]:
# https://sparkbyexamples.com/pyspark/pyspark-column-functions/
df1.EnginekW = df1.EnginekW.astype('int')
df1.Engineltr = df1.Engineltr.cast('int')

In [564]:
df1.printSchema()

root
 |-- Make: string (nullable = true)
 |-- RegistrationType: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- SubModelShort: string (nullable = true)
 |-- BodyType: string (nullable = true)
 |-- FuelType: string (nullable = true)
 |-- DrivenWheels: string (nullable = true)
 |-- Turbo: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- NoofDoors: integer (nullable = true)
 |-- Engineltr: double (nullable = true)
 |-- Engineccm: integer (nullable = true)
 |-- EnginekW: integer (nullable = true)
 |-- GrossVehicleWeight: integer (nullable = true)
 |-- NoofCylinders: integer (nullable = true)
 |-- sales: integer (nullable = true)



In [446]:
df1.agg({'sales':'sum'}).show()

+----------+
|sum(sales)|
+----------+
| 134926194|
+----------+



In [277]:
df1.groupby('year').agg({'sales':'sum'}).orderBy('year', ascending=True).show(50)

+----+----------+
|year|sum(sales)|
+----+----------+
|1980|         0|
|1981|   2330333|
|1982|   2155537|
|1983|   2426774|
|1984|   2393939|
|1985|   2379261|
|1986|   2829438|
|1987|   2915654|
|1988|   2807939|
|1989|   2831740|
|1990|   3158581|
|1991|   4358648|
|1992|   4140027|
|1993|   3360786|
|1994|   3383613|
|1995|   3483517|
|1996|   3659240|
|1997|   3702780|
|1998|   3927768|
|1999|   4008736|
|2000|   3579661|
|2001|   3536380|
|2002|   3435717|
|2003|   3413622|
|2004|   3452449|
|2005|   3517391|
|2006|   3665154|
|2007|   3369760|
|2008|   3313329|
|2009|   3976611|
|2010|   3112815|
|2011|   3407073|
|2012|   3301942|
|2013|   3165136|
|2014|   3265110|
|2015|   3443978|
|2016|   3609656|
|2017|   3711972|
|2018|   3721125|
|2019|   3912258|
|2020|   3185568|
|2021|   1575176|
+----+----------+



In [359]:
# doors imputation 

doors_dict = {'BUS CITY': 2,
 'BUS MULTI PURPOSE': 2,
 'BUS TOURING COACH': 2,
 'BUS UNSPEC.': 2,
 'CAR PICKUP': 2,
 'CAR UNSPEC.': 4,
 'COMMERCIAL UNSPEC.': 2,
 'CONVERTIBLE': 2,
 'COUPE': 2,
 'ESTATE HIGH VOLUME': 4,
 'HATCHBACK': 5,
 'HCV RECREATIONAL TRUCK': 2,
 'HCV RIGID TRUCK': 2,
 'HCV TRACTOR TRUCK': 2,
 'HCV TRUCK UNSPEC.': 2,
 'LCV COMBI VAN': 4,
 'LCV PANEL VAN': 4,
 'LCV PANEL VAN DOUBLE CAB': 4,
 'LCV PICKUP VAN': 4,
 'LCV PICKUP VAN DOUBLE CAB': 4,
 'LCV RECREATIONAL VAN': 4,
 'LCV TIPPER': 2,
 'LCV TRACTOR': 2,
 'LCV TRUCK/CHASSIS CAB': 2,
 'LCV VAN BUS': 4,
 'LCV VAN/TRUCK UNSPEC.': 4,
 'MONOSPACE': 4,
 'MONOSPACE PANEL VAN': 4,
 'MONOSPACE UNSPEC.': 4,
 'PICKUP DOUBLE CAB': 4,
 'PICKUP LONG CAB': 4,
 'PICKUP SINGLE CAB': 2,
 'PICKUP TIPPER': 2,
 'PICKUP UNSPEC.': 2,
 'RETRACTABLE HARDTOP': 2,
 'ROADSTER': 2,
 'SEDAN': 4,
 'SUV CLOSED': 4,
 'SUV OPEN': 4,
 'SUV PICKUP': 4,
 'SUV UNSPEC.': 4,
 'TARGA/T-ROOF': 2,
 'UNSPECIFIED': 4,
 'WAGON': 5}

# # Mapped doors based on Body type
# germany['No. of Doors'] = germany['Body Type'].map(doors_dict)
# # Replace -2/-1 with NaN
# germany.replace(-2, -1, inplace=True)
# germany.replace(-1, np.nan,inplace=True)

In [365]:
map_col  = F.create_map([F.lit(x) for i in doors_dict.items() for x in i])
df1 = df1.withColumn('NoofDoors', map_col[F.col('BodyType')])

In [667]:
s= to_date('2022-01-28')
type(s)

pyspark.sql.column.Column

In [679]:
df.select(collect_set("Turbo")).show(truncate=False)

+-------------------------------+
|collect_set(Turbo)             |
+-------------------------------+
|[Non turbo, Unspecified, Turbo]|
+-------------------------------+



In [447]:
df1.show()

+----------+----------------+---------------+-------------+----------+---------------+------------+---------+----+---------+---------+---------+--------+------------------+-------------+-----+
|      Make|RegistrationType|          Model|SubModelShort|  BodyType|       FuelType|DrivenWheels|    Turbo|year|NoofDoors|Engineltr|Engineccm|EnginekW|GrossVehicleWeight|NoofCylinders|sales|
+----------+----------------+---------------+-------------+----------+---------------+------------+---------+----+---------+---------+---------+--------+------------------+-------------+-----+
|    AIWAYS|  PASSENGER CARS|      AIWAYS U5|           U5|SUV CLOSED|ELECTRIC W/OREX|       FRONT|NON TURBO|2021|        4|      0.0|        0|     140|              2155|            0|    3|
|    AIWAYS|  PASSENGER CARS|      AIWAYS U5|           U5|SUV CLOSED|ELECTRIC W/OREX|       FRONT|NON TURBO|2020|        4|      0.0|        0|     140|              2155|            0|    8|
|    AIWAYS|  PASSENGER CARS|      

In [368]:
df1.agg({'sales':'sum'}).show()

+----------+
|sum(sales)|
+----------+
| 134926194|
+----------+



In [412]:
df1.filter(df1['GrossVehicleWeight']==-2).show(50)

+----+----------------+-----------+--------------+--------------------+--------+------------+-----+----+---------+---------+---------+--------+------------------+-------------+-----+
|Make|RegistrationType|      Model| SubModelShort|            BodyType|FuelType|DrivenWheels|Turbo|year|NoofDoors|Engineltr|Engineccm|EnginekW|GrossVehicleWeight|NoofCylinders|sales|
+----+----------------+-----------+--------------+--------------------+--------+------------+-----+----+---------+---------+---------+--------+------------------+-------------+-----+
|FIAT|  PASSENGER CARS|FIAT DUCATO|DUCATO 18/MAXI|LCV VAN/TRUCK UNS...|  DIESEL|       FRONT|TURBO|1980|        4|      2.3|     2287|      -1|                -2|            4|    0|
|FIAT|  PASSENGER CARS|FIAT DUCATO|DUCATO 18/MAXI|LCV VAN/TRUCK UNS...|  DIESEL|       FRONT|TURBO|1981|        4|      2.3|     2287|      -1|                -2|            4|    0|
|FIAT|  PASSENGER CARS|FIAT DUCATO|DUCATO 18/MAXI|LCV VAN/TRUCK UNS...|  DIESEL|     

In [448]:
df2 = df1.replace([-1,-2], [None, None])

In [428]:
df2.filter((df2['Engineccm']==2287) & (df2['SubModelShort']=='DUCATO 18/MAXI')).show(50)

+----+----------------+-----------+--------------+--------------------+--------+------------+-----+----+---------+---------+---------+--------+------------------+-------------+-----+
|Make|RegistrationType|      Model| SubModelShort|            BodyType|FuelType|DrivenWheels|Turbo|year|NoofDoors|Engineltr|Engineccm|EnginekW|GrossVehicleWeight|NoofCylinders|sales|
+----+----------------+-----------+--------------+--------------------+--------+------------+-----+----+---------+---------+---------+--------+------------------+-------------+-----+
|FIAT|  PASSENGER CARS|FIAT DUCATO|DUCATO 18/MAXI|LCV VAN/TRUCK UNS...|  DIESEL|       FRONT|TURBO|1980|        4|      2.3|     2287|    null|              null|            4|    0|
|FIAT|  PASSENGER CARS|FIAT DUCATO|DUCATO 18/MAXI|LCV VAN/TRUCK UNS...|  DIESEL|       FRONT|TURBO|1981|        4|      2.3|     2287|    null|              null|            4|    0|
|FIAT|  PASSENGER CARS|FIAT DUCATO|DUCATO 18/MAXI|LCV VAN/TRUCK UNS...|  DIESEL|     

# Missing Values

In [385]:
df2.select(count(when(isnan('Engineltr'),'Engineltr')).alias('Engineltr')).show()

+---------+
|Engineltr|
+---------+
|   165942|
+---------+



In [429]:
df2.select(count(when(isnan('GrossVehicleWeight')|col('GrossVehicleWeight').isNull(),'GrossVehicleWeight')).alias('GrossVehicleWeight')).show()

+------------------+
|GrossVehicleWeight|
+------------------+
|            592284|
+------------------+



In [449]:
df3 = df2.select([sum(when(isnan(c) | col(c).isNull(),1).otherwise(0)).alias(c) for c in df2.columns])
df3.show()

+----+----------------+-----+-------------+--------+--------+------------+-----+----+---------+---------+---------+--------+------------------+-------------+-----+
|Make|RegistrationType|Model|SubModelShort|BodyType|FuelType|DrivenWheels|Turbo|year|NoofDoors|Engineltr|Engineccm|EnginekW|GrossVehicleWeight|NoofCylinders|sales|
+----+----------------+-----+-------------+--------+--------+------------+-----+----+---------+---------+---------+--------+------------------+-------------+-----+
|   0|               0|    0|            0|       0|       0|           0|    0|   0|        0|     6295|     6256|   10980|             24222|        26298|    0|
+----+----------------+-----+-------------+--------+--------+------------+-----+----+---------+---------+---------+--------+------------------+-------------+-----+



In [486]:
turbos = df2.select('Turbo').distinct().collect()
turbos = [turbos[e]['Turbo'] for e in range(len(turbos))]

['UNSPECIFIED', 'NON TURBO', 'TURBO']

In [546]:
t = df2.select('Turbo').distinct().collect()

In [551]:
t[0].Turbo

'UNSPECIFIED'

In [488]:
# https://sparkbyexamples.com/pyspark/pyspark-pivot-and-unpivot-dataframe/#:~:text=PySpark%20pivot()%20function%20is,individual%20columns%20with%20distinct%20data.
pivot_df = df2.groupBy(['Make','Model']).pivot('Turbo', turbos).sum('sales')
pivot_df.show()

+---------------+--------------------+-----------+---------+------+
|           Make|               Model|UNSPECIFIED|NON TURBO| TURBO|
+---------------+--------------------+-----------+---------+------+
|          DACIA|       DACIA SANDERO|          2|   141126|128481|
|   VOLVO TRUCKS|VOLVO TRUCKS FH-S...|       null|     null|     3|
|           SEAT|          SEAT TERRA|         15|    20728|  null|
|            BMW|              BMW X2|         64|     null| 37262|
|            GAZ|         GAZ UNSPEC.|          1|     null|  null|
|        CITROEN|  CITROEN C3 PICASSO|       null|    19922| 12935|
|      SSANGYONG|     SSANGYONG MUSSO|       null|     null|   381|
|          ROVER|            ROVER 45|       null|     4319|  1171|
|          VOLVO|       VOLVO UNSPEC.|       1934|      391|   335|
|         DAEWOO|      DAEWOO LACETTI|       null|     1935|  null|
|      CHEVROLET|   CHEVROLET LACETTI|       null|     5004|   559|
|GIOTTI VICTORIA|GIOTTI VICTORIA G...|       nul

In [565]:
pivot_df.select((col('NON TURBO')+col('TURBO')).alias('excel')).show(1)

+------+
| excel|
+------+
|269607|
+------+
only showing top 1 row



In [495]:
df2.createOrReplaceTempView('germany_scrappage')

In [None]:
df2.groupBy

In [498]:
spark.sql('select * from germany_scrappage limit 5').show()

+----------+----------------+---------------+-------------+----------+---------------+------------+---------+----+---------+---------+---------+--------+------------------+-------------+-----+
|      Make|RegistrationType|          Model|SubModelShort|  BodyType|       FuelType|DrivenWheels|    Turbo|year|NoofDoors|Engineltr|Engineccm|EnginekW|GrossVehicleWeight|NoofCylinders|sales|
+----------+----------------+---------------+-------------+----------+---------------+------------+---------+----+---------+---------+---------+--------+------------------+-------------+-----+
|    AIWAYS|  PASSENGER CARS|      AIWAYS U5|           U5|SUV CLOSED|ELECTRIC W/OREX|       FRONT|NON TURBO|2021|        4|      0.0|        0|     140|              2155|            0|    3|
|    AIWAYS|  PASSENGER CARS|      AIWAYS U5|           U5|SUV CLOSED|ELECTRIC W/OREX|       FRONT|NON TURBO|2020|        4|      0.0|        0|     140|              2155|            0|    8|
|    AIWAYS|  PASSENGER CARS|      

In [507]:
spark.sql('select NoofCylinders, count(NoofCylinders) as CC from germany_scrappage group by NoofCylinders order by CC DESC').show()

+-------------+------+
|NoofCylinders|    CC|
+-------------+------+
|            4|340456|
|            6| 52686|
|            5| 26754|
|            8| 18662|
|            3| 11026|
|            0|  3917|
|           12|  2586|
|            2|   766|
|           10|   550|
|            1|   178|
|           16|    43|
|         null|     0|
+-------------+------+



In [513]:
x = spark.sql('SELECT year, Make, Model, SubModelShort, FuelType, BodyType, NoofCylinders, count(NoofCylinders) as CC from germany_scrappage group by 1,2,3,4,5,6,7 order by CC DESC')
x.show()

+----+----------+--------------------+------------------+--------+--------------------+-------------+---+
|year|      Make|               Model|     SubModelShort|FuelType|            BodyType|NoofCylinders| CC|
+----+----------+--------------------+------------------+--------+--------------------+-------------+---+
|2005|      FIAT|         FIAT DUCATO|    DUCATO 18/MAXI|  DIESEL|LCV RECREATIONAL VAN|            4|421|
|2015|VOLKSWAGEN|VOLKSWAGEN TRANSP...|       TRANSPORTER|  DIESEL|LCV VAN/TRUCK UNS...|            4|392|
|2016|VOLKSWAGEN|VOLKSWAGEN TRANSP...|       TRANSPORTER|  DIESEL|LCV VAN/TRUCK UNS...|            4|356|
|2006|      FIAT|         FIAT DUCATO|    DUCATO 18/MAXI|  DIESEL|LCV RECREATIONAL VAN|            4|318|
|2017|VOLKSWAGEN|VOLKSWAGEN TRANSP...|       TRANSPORTER|  DIESEL|LCV VAN/TRUCK UNS...|            4|297|
|2012|      FIAT|         FIAT DUCATO|    DUCATO 40/MAXI|  DIESEL|LCV RECREATIONAL VAN|            4|270|
|2019|VOLKSWAGEN|VOLKSWAGEN TRANSP...|       T

In [521]:
x.columns

['year',
 'Make',
 'Model',
 'SubModelShort',
 'FuelType',
 'BodyType',
 'NoofCylinders',
 'CC']

In [527]:
x1 = x.dropDuplicates(subset = ['year',
 'Make',
 'Model',
 'SubModelShort',
 'FuelType',
 'BodyType',
 'NoofCylinders'])

In [525]:
x.count()

80099

In [528]:
x1.count()

80099

In [523]:
# x.write.csv('x.csv')

# x.coalesce(1).write.option("header",True).format("csv").save("xx")

In [590]:
group_cols = ['year','Make', 'Model','SubModelShort', 'FuelType', 'BodyType', 'NoofCylinders']
x = df2.groupBy(group_cols).agg(count('NoofCylinders').alias('CC')).orderBy(col('CC').desc(), col('year').asc())
# x.show(truncate=False)

In [597]:
x1 = x.dropDuplicates(['year','Make', 'Model','SubModelShort', 'FuelType', 'BodyType'])

In [600]:
# x1.filter((col("year")==2002) & (col("SubModelShort")=="POLO") & (col("BodyType")=="HATCHBACK")).show()

In [592]:
x1.show()

+----+-------------+--------------------+--------------+-----------+------------------+-------------+---+----------+
|year|         Make|               Model| SubModelShort|   FuelType|          BodyType|NoofCylinders| CC|dense_rank|
+----+-------------+--------------------+--------------+-----------+------------------+-------------+---+----------+
|1981|       NISSAN|        NISSAN SUNNY|         SUNNY|     PETROL|             SEDAN|            4|  4|         1|
|1981|  ROLLS-ROYCE| ROLLS-ROYCE UNSPEC.|   UNSPECIFIED|UNSPECIFIED|       CAR UNSPEC.|         null|  0|         1|
|1981|         SAAB|          SAAB 90/99|         90/99|     PETROL|             SEDAN|            4|  5|         1|
|1982|MERCEDES-BENZ|MERCEDES-BENZ E-C...|           230|     PETROL|             WAGON|            4|  2|         1|
|1982|      PORSCHE|         PORSCHE 911|   911 CARRERA|     PETROL|             COUPE|            6|  1|         1|
|1982|         SEAT|        SEAT UNSPEC.|   UNSPECIFIED|UNSPECIF

In [604]:
row = df2.select("year").distinct().collect()
years = sorted([row[x]['year'] for x in range(len(row))])

In [643]:
def scrappage_year_by_age(age):
    """
    This will create a dataframe of last N number of years based on "age" parameter
    
    Args:
        df (pandas dataframe): [new reg dataframe]

    Returns:
        [pandas dataframe]: [Sales percentage contribution of newreg dataframe]
    """
    df = spark.createDataFrame([], schema)

    for year in [2000]:
        lst = []
        for e in range(age * 2 + 1):
            lst.append(year - e)

        df.withColumn("historic_years",  F.array( [F.lit(x) for x in lst] ))
        
        # df["historic_years" + "_" + year.astype(str)] = (pd.Series(lst).sort_values(ascending=True).reset_index(drop=True))

    return df

In [644]:
schema = StructType([
  StructField('historic_years', IntegerType(), True)
  # StructField('middlename', StringType(), True),
  # StructField('lastname', StringType(), True)
  ])

In [651]:
df2.columns

['Make',
 'RegistrationType',
 'Model',
 'SubModelShort',
 'BodyType',
 'FuelType',
 'DrivenWheels',
 'Turbo',
 'year',
 'NoofDoors',
 'Engineltr',
 'Engineccm',
 'EnginekW',
 'GrossVehicleWeight',
 'NoofCylinders',
 'sales']

In [655]:
df2.select(first("year")).show()

+-----------+
|first(year)|
+-----------+
|       2021|
+-----------+



In [648]:
scrappage_year_by_age(17).collect()

Py4JJavaError: An error occurred while calling o9237.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 612.0 failed 1 times, most recent failure: Lost task 0.0 in stage 612.0 (TID 17911) (INGGNLT1PYD593.INFO.CORP executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:458)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3519)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3516)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:458)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 29 more


In [627]:
l = list(np.arange(1981,1990))

# [x.cast(IntegerType()) for x in l]
# [F.lit(x) for x in l]

In [624]:
F.array([F.lit()])

TypeError: createDataFrame() missing 1 required positional argument: 'data'

In [587]:
x1 = x.toPandas()

In [588]:
x1.to_csv('x1.csv')

In [586]:
x.write.options(header='True').csv("x.csv")

Py4JJavaError: An error occurred while calling o8797.csv.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:736)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:271)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:287)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:865)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:547)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:587)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:559)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:586)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:559)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:586)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:559)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:705)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:178)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:979)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:548)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:569)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:592)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:689)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:78)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1814)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1791)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:302)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:326)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:343)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:468)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:439)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:516)
	... 21 more


In [582]:
x.filter((col("year")==2005) & (col("SubModelShort")=="A4") & (col("FuelType")=="UNSPECIFIED")).show(truncate=False)

+----+----+-------+-------------+-----------+-----------+-------------+---+
|year|Make|Model  |SubModelShort|FuelType   |BodyType   |NoofCylinders|CC |
+----+----+-------+-------------+-----------+-----------+-------------+---+
|2005|AUDI|AUDI A4|A4           |UNSPECIFIED|WAGON      |null         |0  |
|2005|AUDI|AUDI A4|A4           |UNSPECIFIED|CAR UNSPEC.|null         |0  |
|2005|AUDI|AUDI A4|A4           |UNSPECIFIED|SEDAN      |null         |0  |
+----+----+-------+-------------+-----------+-----------+-------------+---+



In [506]:
df2.groupBy(['year','Make', 'Model','SubModelShort', 'FuelType', 'BodyType']).agg({'NoofCylinders':'count'}).withColumnRenamed('count(NoofCylinders)','CC').orderBy('CC', ascending=False).show()

+----+----------+--------------------+------------------+--------+--------------------+---+
|year|      Make|               Model|     SubModelShort|FuelType|            BodyType| CC|
+----+----------+--------------------+------------------+--------+--------------------+---+
|2005|      FIAT|         FIAT DUCATO|    DUCATO 18/MAXI|  DIESEL|LCV RECREATIONAL VAN|421|
|2015|VOLKSWAGEN|VOLKSWAGEN TRANSP...|       TRANSPORTER|  DIESEL|LCV VAN/TRUCK UNS...|392|
|2016|VOLKSWAGEN|VOLKSWAGEN TRANSP...|       TRANSPORTER|  DIESEL|LCV VAN/TRUCK UNS...|356|
|2006|      FIAT|         FIAT DUCATO|    DUCATO 18/MAXI|  DIESEL|LCV RECREATIONAL VAN|318|
|2017|VOLKSWAGEN|VOLKSWAGEN TRANSP...|       TRANSPORTER|  DIESEL|LCV VAN/TRUCK UNS...|298|
|2012|      FIAT|         FIAT DUCATO|    DUCATO 40/MAXI|  DIESEL|LCV RECREATIONAL VAN|270|
|2019|VOLKSWAGEN|VOLKSWAGEN TRANSP...|       TRANSPORTER|  DIESEL|LCV VAN/TRUCK UNS...|268|
|2020|      FIAT|         FIAT DUCATO|DUCATO UNSPECIFIED|  DIESEL|LCV RECREATION

In [529]:
m1.show()

+----+----------+--------------------+--------------+---------------+--------------------+------------------+
|year|      Make|               Model| SubModelShort|       FuelType|            BodyType|sum(NoofCylinders)|
+----+----------+--------------------+--------------+---------------+--------------------+------------------+
|2006|ALFA ROMEO|      ALFA ROMEO 166|           166|         DIESEL|               SEDAN|                25|
|1985|       AMC|         AMC UNSPEC.|   UNSPECIFIED|    UNSPECIFIED|         CAR UNSPEC.|              null|
|2004|      AUDI|             AUDI A4|            A4|         DIESEL|               WAGON|               276|
|2021|      AUDI|             AUDI A3|            S3|         PETROL|               SEDAN|                 8|
|2017|      AUDI|             AUDI A5|            A5|         DIESEL|           HATCHBACK|               242|
|2021|      AUDI|        AUDI UNSPEC.|   UNSPECIFIED|ELECTRIC W/OREX|         CAR UNSPEC.|                 0|
|1998|    

In [532]:
# m1.write.format("csv").option("path", "C:\\Users\\akash.mathur\\Downloads\\Akash\\Spark\\m1")

In [None]:
m1.write.format

In [489]:
# x.write.option("header",True).csv("data/csv")

# END

In [533]:
columns = ["language","users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

In [540]:
x = spark.createDataFrame(data).toDF(*columns)

In [None]:
spark.createDataFrame()()

In [542]:
dfFromData2 = spark.createDataFrame(data).toDF(*columns)

In [544]:
'Make' in df.columns

True