In [1]:
from pyspark.sql import SparkSession

sparkdriver = SparkSession.builder.master('local').appName('demoApp').getOrCreate()
sparkdriver

In [3]:
pyspark_df = sparkdriver.read.csv('supermarket_sales.csv', header = True, inferSchema=True)

In [4]:
pyspark_df.show(5)

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|     Date| Time|    Payment|  cogs|gross margin percentage|gross income|Rating|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|26.1415|548.9715| 1/5/2019|13:08|    Ewallet|522.83|            4.761904762|     26.1415|   9.1|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|   3.82|   80.22| 3/8/2019|10:29|       Cash|  76.4|            4.761904762|        3.82|   9.6|
|631-41-3108|     A|   Yangon|       Normal|  Male|  Ho

# Default Parallelism in Spark #

### No Matter the number of nodes in Hadoop or HDFS ###

In [7]:
sparkdriver.sparkContext.defaultParallelism

1

In [6]:
print(sparkdriver.sparkContext.defaultParallelism)

1


In [10]:
pyspark_df.where('Gender="Female"').show(10)

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|     Date| Time|    Payment|  cogs|gross margin percentage|gross income|Rating|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|26.1415|548.9715| 1/5/2019|13:08|    Ewallet|522.83|            4.761904762|     26.1415|   9.1|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|   3.82|   80.22| 3/8/2019|10:29|       Cash|  76.4|            4.761904762|        3.82|   9.6|
|355-53-5943|     A|   Yangon|       Member|Female|Elec

In [11]:
from pyspark.sql.functions import *

In [13]:
pyspark_df.withColumn('Quantity', col('Quantity').cast('int')).select('Quantity', col('Quantity')/10).show(10)

+--------+---------------+
|Quantity|(Quantity / 10)|
+--------+---------------+
|       7|            0.7|
|       5|            0.5|
|       7|            0.7|
|       8|            0.8|
|       7|            0.7|
|       7|            0.7|
|       6|            0.6|
|      10|            1.0|
|       2|            0.2|
|       3|            0.3|
+--------+---------------+
only showing top 10 rows



### with a new column 

In [15]:
pyspark_df.withColumn('Quantity', col('Quantity').cast('int')).withColumn('QuantityNew', col('Quantity')/10).show(10)

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+-----------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|     Date| Time|    Payment|  cogs|gross margin percentage|gross income|Rating|QuantityNew|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+-----------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|26.1415|548.9715| 1/5/2019|13:08|    Ewallet|522.83|            4.761904762|     26.1415|   9.1|        0.7|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|   3.82|   80.22| 3/8/2019|10:29|       Cash|  76.4|            4.761904762|        3.82|   9.6|        

In [17]:
pyspark_df.select('Gender').distinct().show()

+------+
|Gender|
+------+
|Female|
|  Male|
+------+



In [19]:
pyspark_df.select('City').distinct().show()
pyspark_df.select('City').distinct().count()

+---------+
|     City|
+---------+
|Naypyitaw|
| Mandalay|
|   Yangon|
+---------+



3

In [24]:
pyspark_df.withColumn('severity', when(col('City')=="Naypyitaw", 'High').when(col('City')=="Mandalay", 'Mid').otherwise('Low')).show(5)

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+--------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|     Date| Time|    Payment|  cogs|gross margin percentage|gross income|Rating|severity|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+--------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|26.1415|548.9715| 1/5/2019|13:08|    Ewallet|522.83|            4.761904762|     26.1415|   9.1|     Low|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|   3.82|   80.22| 3/8/2019|10:29|       Cash|  76.4|            4.761904762|        3.82|   9.6|    High|
|631-41-31

### Querying using SQL with temp table or permanent tables ###

In [25]:
pyspark_df.registerTempTable('tempTable1')

DataFrame[count(1): bigint]

In [27]:
data = sparkdriver.sql('select count(*) from temptable1').show()
data

+--------+
|count(1)|
+--------+
|    1000|
+--------+



In [29]:
sparkdriver.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [30]:
sparkdriver.sql('create database db1')

DataFrame[]

In [31]:
sparkdriver.sql('show databases').show()

+---------+
|namespace|
+---------+
|      db1|
|  default|
+---------+



In [32]:
sparkdriver.sql('show tables in default').show()

+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
|        |temptable1|       true|
+--------+----------+-----------+



In [34]:
pyspark_df1 = sparkdriver.read.csv('supermarket_sales1.csv', inferSchema=True)

In [35]:
pyspark_df1.write.saveAsTable('db1.permtable')
sparkdriver.sql('show tables in default').show()

+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
|        |temptable1|       true|
+--------+----------+-----------+



In [36]:
sparkdriver.sql('show tables in db1').show()

+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
|     db1| permtable|      false|
|        |temptable1|       true|
+--------+----------+-----------+

