##### Install pyspark
##### Import and Create SparkSession
##### Read Data from .csv file
##### Create and Add view/table to the spark(Object created from SparkSession).
#####

In [10]:
pip install pyspark



In [11]:
import pyspark


In [12]:
from pyspark.sql import SparkSession

In [13]:
spark = SparkSession.builder.appName('PysparkApp').getOrCreate()

In [14]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7e7d802bf400>




---


### Reading data from .csv file


---



In [15]:
df = spark.read.csv('Region_Master.csv', header=True, inferSchema=True)

In [16]:
df.show()

+------+--------------------+
|REG_NO|         REGION_DESC|
+------+--------------------+
|     1|       1-DKI Jakarta|
|     2|        2-Jawa Barat|
|     3|3-Jawa Timur\x09 ...|
|     4|4-Sulawesi\x09 Ma...|
|     5|        5-Kalimantan|
|     6|          6-Sumatera|
|     7|7-Jawa Tengah dan...|
|     8|                0-HO|
+------+--------------------+





---


### Creating and Adding view to the spark object.



---














In [17]:
df.createOrReplaceTempView('df_view')
result = spark.sql('select * from df_view')
result.show()

+------+--------------------+
|REG_NO|         REGION_DESC|
+------+--------------------+
|     1|       1-DKI Jakarta|
|     2|        2-Jawa Barat|
|     3|3-Jawa Timur\x09 ...|
|     4|4-Sulawesi\x09 Ma...|
|     5|        5-Kalimantan|
|     6|          6-Sumatera|
|     7|7-Jawa Tengah dan...|
|     8|                0-HO|
+------+--------------------+



In [18]:
tables = spark.catalog.listTables()
for table in tables:
  print(table)

Table(name='df_view', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)


In [19]:
#Manipulating ceated view using sql query
query = 'select * from df_view where reg_no%2=0'
output = spark.sql(query)
output.show()   #type(output) ->pyspark.sql.dataframe.DataFrame

+------+--------------------+
|REG_NO|         REGION_DESC|
+------+--------------------+
|     2|        2-Jawa Barat|
|     4|4-Sulawesi\x09 Ma...|
|     6|          6-Sumatera|
|     8|                0-HO|
+------+--------------------+



In [20]:
type(output)

pyspark.sql.dataframe.DataFrame



---


##### Spark DataFrame to Pandas DataFrame


---



In [21]:
pandasDF = output.toPandas()
pandasDF

Unnamed: 0,REG_NO,REGION_DESC
0,2,2-Jawa Barat
1,4,4-Sulawesi\x09 Maluku dan Papua
2,6,6-Sumatera
3,8,0-HO




---


##### Spark DataFrame to Pandas DataFrame


---



In [22]:
#PandasDF (variable) already converted in pandas dataframe, we'll use it here.
pysparkDF = spark.createDataFrame(pandasDF)
# type(pysparkDF)   #pyspark.sql.dataframe.DataFrame
pysparkDF.show()

+------+--------------------+
|REG_NO|         REGION_DESC|
+------+--------------------+
|     2|        2-Jawa Barat|
|     4|4-Sulawesi\x09 Ma...|
|     6|          6-Sumatera|
|     8|                0-HO|
+------+--------------------+





---


### Adding Columns


---



In [23]:
data = spark.read.csv('Region_Master.csv', header=True, inferSchema=True)
data.show()

+------+--------------------+
|REG_NO|         REGION_DESC|
+------+--------------------+
|     1|       1-DKI Jakarta|
|     2|        2-Jawa Barat|
|     3|3-Jawa Timur\x09 ...|
|     4|4-Sulawesi\x09 Ma...|
|     5|        5-Kalimantan|
|     6|          6-Sumatera|
|     7|7-Jawa Tengah dan...|
|     8|                0-HO|
+------+--------------------+



In [24]:
#Adding Column
data.withColumn('new_col', data['reg_no']*10).show()

+------+--------------------+-------+
|REG_NO|         REGION_DESC|new_col|
+------+--------------------+-------+
|     1|       1-DKI Jakarta|     10|
|     2|        2-Jawa Barat|     20|
|     3|3-Jawa Timur\x09 ...|     30|
|     4|4-Sulawesi\x09 Ma...|     40|
|     5|        5-Kalimantan|     50|
|     6|          6-Sumatera|     60|
|     7|7-Jawa Tengah dan...|     70|
|     8|                0-HO|     80|
+------+--------------------+-------+



In [25]:
def new_col_value(val):
  return 'yes' if val%2==0 else 'no'

In [32]:
# spark.catalog.listTables()    #[Table(name='df_view', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

def new_col_value(val):
  return ('yes' if val%2==0 else 'no')


if __name__=='__main__':
  tables = spark.table('df_view')
  tables.show()
  tables.withColumn('new column', new_col_value(tables['REG_NO'])).show()

+------+--------------------+
|REG_NO|         REGION_DESC|
+------+--------------------+
|     1|       1-DKI Jakarta|
|     2|        2-Jawa Barat|
|     3|3-Jawa Timur\x09 ...|
|     4|4-Sulawesi\x09 Ma...|
|     5|        5-Kalimantan|
|     6|          6-Sumatera|
|     7|7-Jawa Tengah dan...|
|     8|                0-HO|
+------+--------------------+



PySparkValueError: ignored

In [30]:
from pyspark.sql.functions import when

def new_col_value(val):
    return when(val % 2 == 0, 'yes').otherwise('no')

if __name__ == '__main__':
    tables = spark.table('df_view')
    tables.show()
    tables.withColumn('new column', new_col_value(tables['REG_NO'])).show()


+------+--------------------+
|REG_NO|         REGION_DESC|
+------+--------------------+
|     1|       1-DKI Jakarta|
|     2|        2-Jawa Barat|
|     3|3-Jawa Timur\x09 ...|
|     4|4-Sulawesi\x09 Ma...|
|     5|        5-Kalimantan|
|     6|          6-Sumatera|
|     7|7-Jawa Tengah dan...|
|     8|                0-HO|
+------+--------------------+

+------+--------------------+----------+
|REG_NO|         REGION_DESC|new column|
+------+--------------------+----------+
|     1|       1-DKI Jakarta|        no|
|     2|        2-Jawa Barat|       yes|
|     3|3-Jawa Timur\x09 ...|        no|
|     4|4-Sulawesi\x09 Ma...|       yes|
|     5|        5-Kalimantan|        no|
|     6|          6-Sumatera|       yes|
|     7|7-Jawa Tengah dan...|        no|
|     8|                0-HO|       yes|
+------+--------------------+----------+



In [34]:
print(type(input()))

13
<class 'str'>




---



---



In [None]:
readdf=spark.read.format("jdbc").option("url","jdbc:oracle:thin:@blrtrnorcldb001.fintellix.com:1521:ORCLCDB").\
option("dbtable",'emp').\
option("user","i41621").\
option("password","Welcome1").\
option("driver","oracle.jdbc.driver.OracleDriver").\
load()

Py4JJavaError: ignored

In [None]:
df = spark.read.csv('data.csv', header=True)

In [None]:
df.show()

+---------------------+-------------+----------+---------------+---------+--------------+---------+---------+------------+-----------+-----------------+-----------------+-----------------+-----------------+-------------+----------------+------------+----------------+-------------+------------------+----------+-------------+-----------+-------------------+-----------------+------------------+---------------+-----------------+---------------+----------------+--------------+------------------+----------------+------------+--------------------+-----------------------+
|PRODUCT_INSTANCE_BKEY|IDENTITY_CODE|AS_OF_DATE|ISO_CURRENCY_CD|AMRT_TERM|AMRT_TERM_MULT|BRANCH_CD|  CIF_KEY|CUR_BOOK_BAL|DATA_SOURCE|GEOGRAPHIC_LOC_CD|LAST_PAYMENT_DATE|LAST_REPRICE_DATE|MARKET_SEGMENT_CD|MATURITY_DATE|OPEN_ACCOUNT_FLG|ORG_BOOK_BAL|ORIGINATION_DATE|REMAIN_TERM_C|REMAIN_TERM_MULT_C|ISSUE_DATE|BDI_LOB_XSELL|CUSTOMER_ID|SERVICE_OUTLET_BKEY|PRODUCT_TYPE_BKEY|Prod_Sub_Type_Code|ACCOUNT_OFFICER|ACCOUNT_OPEN_DATE|A

In [None]:
df.count() #33599

33599

## Null Value Handling


#### Importing col

In [None]:
from pyspark.sql.functions import col

In [None]:
df.filter(col('OverDraft_Status_Ind').isNull()).show()

+---------------------+-------------+----------+---------------+---------+--------------+---------+---------+------------+-----------+-----------------+-----------------+-----------------+-----------------+-------------+----------------+------------+----------------+-------------+------------------+----------+-------------+-----------+-------------------+-----------------+------------------+---------------+-----------------+---------------+----------------+--------------+------------------+----------------+------------+--------------------+-----------------------+
|PRODUCT_INSTANCE_BKEY|IDENTITY_CODE|AS_OF_DATE|ISO_CURRENCY_CD|AMRT_TERM|AMRT_TERM_MULT|BRANCH_CD|  CIF_KEY|CUR_BOOK_BAL|DATA_SOURCE|GEOGRAPHIC_LOC_CD|LAST_PAYMENT_DATE|LAST_REPRICE_DATE|MARKET_SEGMENT_CD|MATURITY_DATE|OPEN_ACCOUNT_FLG|ORG_BOOK_BAL|ORIGINATION_DATE|REMAIN_TERM_C|REMAIN_TERM_MULT_C|ISSUE_DATE|BDI_LOB_XSELL|CUSTOMER_ID|SERVICE_OUTLET_BKEY|PRODUCT_TYPE_BKEY|Prod_Sub_Type_Code|ACCOUNT_OFFICER|ACCOUNT_OPEN_DATE|A

#### Without using col (Null Values)

In [None]:
# Data where OverDraft_Status_Ind is NUll
# df.count()
df.filter(df.OverDraft_Status_Ind.isNull()).count()
# df.filter("OverDraft_Status_Ind is NULL").count()

26717

In [None]:
df.na.fill(value=0, subset='OverDraft_Status_Ind').show()

+---------------------+-------------+----------+---------------+---------+--------------+---------+---------+------------+-----------+-----------------+-----------------+-----------------+-----------------+-------------+----------------+------------+----------------+-------------+------------------+----------+-------------+-----------+-------------------+-----------------+------------------+---------------+-----------------+---------------+----------------+--------------+------------------+----------------+------------+--------------------+-----------------------+
|PRODUCT_INSTANCE_BKEY|IDENTITY_CODE|AS_OF_DATE|ISO_CURRENCY_CD|AMRT_TERM|AMRT_TERM_MULT|BRANCH_CD|  CIF_KEY|CUR_BOOK_BAL|DATA_SOURCE|GEOGRAPHIC_LOC_CD|LAST_PAYMENT_DATE|LAST_REPRICE_DATE|MARKET_SEGMENT_CD|MATURITY_DATE|OPEN_ACCOUNT_FLG|ORG_BOOK_BAL|ORIGINATION_DATE|REMAIN_TERM_C|REMAIN_TERM_MULT_C|ISSUE_DATE|BDI_LOB_XSELL|CUSTOMER_ID|SERVICE_OUTLET_BKEY|PRODUCT_TYPE_BKEY|Prod_Sub_Type_Code|ACCOUNT_OFFICER|ACCOUNT_OPEN_DATE|A

In [None]:
df.fillna(0, subset=['AMRT_TERM']).show()

+---------------------+-------------+----------+---------------+---------+--------------+---------+---------+------------+-----------+-----------------+-----------------+-----------------+-----------------+-------------+----------------+------------+----------------+-------------+------------------+----------+-------------+-----------+-------------------+-----------------+------------------+---------------+-----------------+---------------+----------------+--------------+------------------+----------------+------------+--------------------+-----------------------+
|PRODUCT_INSTANCE_BKEY|IDENTITY_CODE|AS_OF_DATE|ISO_CURRENCY_CD|AMRT_TERM|AMRT_TERM_MULT|BRANCH_CD|  CIF_KEY|CUR_BOOK_BAL|DATA_SOURCE|GEOGRAPHIC_LOC_CD|LAST_PAYMENT_DATE|LAST_REPRICE_DATE|MARKET_SEGMENT_CD|MATURITY_DATE|OPEN_ACCOUNT_FLG|ORG_BOOK_BAL|ORIGINATION_DATE|REMAIN_TERM_C|REMAIN_TERM_MULT_C|ISSUE_DATE|BDI_LOB_XSELL|CUSTOMER_ID|SERVICE_OUTLET_BKEY|PRODUCT_TYPE_BKEY|Prod_Sub_Type_Code|ACCOUNT_OFFICER|ACCOUNT_OPEN_DATE|A

In [None]:
df.collect()[0]

Row(PRODUCT_INSTANCE_BKEY='CH-865526978524', IDENTITY_CODE='20131129', AS_OF_DATE='29-11-2013', ISO_CURRENCY_CD='IDR', AMRT_TERM=None, AMRT_TERM_MULT=None, BRANCH_CD='32000', CIF_KEY='33843933', CUR_BOOK_BAL='0', DATA_SOURCE='11', GEOGRAPHIC_LOC_CD='3', LAST_PAYMENT_DATE=None, LAST_REPRICE_DATE='29-11-2013', MARKET_SEGMENT_CD='12', MATURITY_DATE='01-01-2014', OPEN_ACCOUNT_FLG='2', ORG_BOOK_BAL='0', ORIGINATION_DATE='29-11-2013', REMAIN_TERM_C='33', REMAIN_TERM_MULT_C='D', ISSUE_DATE='17-04-2012', BDI_LOB_XSELL='0', CUSTOMER_ID='112655', SERVICE_OUTLET_BKEY='3200', PRODUCT_TYPE_BKEY='DEMD', Prod_Sub_Type_Code='SAVG', ACCOUNT_OFFICER='RM-71', ACCOUNT_OPEN_DATE='27-03-1997', AVG_Monthly_BAL='406468', Accrued Interest='16258.72', Debit_Interest='2276.2208', Unpaid_Item_charge='2003.074304', Min_Bal_Required='10000', Account_type='Savings', OverDraft_Status_Ind='OD', Account_Maintenance_Fee='325.1744')

In [None]:
df.show(n=2)

+---------------------+-------------+----------+---------------+---------+--------------+---------+--------+------------+-----------+-----------------+-----------------+-----------------+-----------------+-------------+----------------+------------+----------------+-------------+------------------+----------+-------------+-----------+-------------------+-----------------+------------------+---------------+-----------------+---------------+----------------+--------------+------------------+----------------+------------+--------------------+-----------------------+
|PRODUCT_INSTANCE_BKEY|IDENTITY_CODE|AS_OF_DATE|ISO_CURRENCY_CD|AMRT_TERM|AMRT_TERM_MULT|BRANCH_CD| CIF_KEY|CUR_BOOK_BAL|DATA_SOURCE|GEOGRAPHIC_LOC_CD|LAST_PAYMENT_DATE|LAST_REPRICE_DATE|MARKET_SEGMENT_CD|MATURITY_DATE|OPEN_ACCOUNT_FLG|ORG_BOOK_BAL|ORIGINATION_DATE|REMAIN_TERM_C|REMAIN_TERM_MULT_C|ISSUE_DATE|BDI_LOB_XSELL|CUSTOMER_ID|SERVICE_OUTLET_BKEY|PRODUCT_TYPE_BKEY|Prod_Sub_Type_Code|ACCOUNT_OFFICER|ACCOUNT_OPEN_DATE|AVG

In [None]:
from pyspark.sql.functions import coalesce, expr

In [None]:
df.withColumn('AMRT_TERM', expr("nvl(AMRT_TERM, 0)")).show()

+---------------------+-------------+----------+---------------+---------+--------------+---------+---------+------------+-----------+-----------------+-----------------+-----------------+-----------------+-------------+----------------+------------+----------------+-------------+------------------+----------+-------------+-----------+-------------------+-----------------+------------------+---------------+-----------------+---------------+----------------+--------------+------------------+----------------+------------+--------------------+-----------------------+
|PRODUCT_INSTANCE_BKEY|IDENTITY_CODE|AS_OF_DATE|ISO_CURRENCY_CD|AMRT_TERM|AMRT_TERM_MULT|BRANCH_CD|  CIF_KEY|CUR_BOOK_BAL|DATA_SOURCE|GEOGRAPHIC_LOC_CD|LAST_PAYMENT_DATE|LAST_REPRICE_DATE|MARKET_SEGMENT_CD|MATURITY_DATE|OPEN_ACCOUNT_FLG|ORG_BOOK_BAL|ORIGINATION_DATE|REMAIN_TERM_C|REMAIN_TERM_MULT_C|ISSUE_DATE|BDI_LOB_XSELL|CUSTOMER_ID|SERVICE_OUTLET_BKEY|PRODUCT_TYPE_BKEY|Prod_Sub_Type_Code|ACCOUNT_OFFICER|ACCOUNT_OPEN_DATE|A

SyntaxError: ignored