In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import*
from datetime import datetime


In [8]:
#create spark session from pyspark
spark = SparkSession.builder\
        .appName("spark with Hive")\
         .enableHiveSupport() \
         .getOrCreate()

24/02/24 11:49:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [31]:
data = [
    ["Product A" ,1001,datetime.strptime("2024-03-20","%Y-%m-%d"),datetime.strptime("2024-03-20 10:55:39", "%Y-%m-%d %H:%M:%S") ,29.90],
    ["Product B" ,1002,datetime.strptime("2024-03-21","%Y-%m-%d"),datetime.strptime("2024-03-21 10:55:39", "%Y-%m-%d %H:%M:%S") ,49.00],
    ["Product C" ,1003,datetime.strptime("2024-03-22","%Y-%m-%d"),datetime.strptime("2024-03-22 10:55:39", "%Y-%m-%d %H:%M:%S") ,20.40],
    ["Product D" ,1004,datetime.strptime("2024-03-23","%Y-%m-%d"),datetime.strptime("2024-03-23 10:55:39", "%Y-%m-%d %H:%M:%S") ,29.80],
    ["Product E" ,1005,datetime.strptime("2024-03-24","%Y-%m-%d"),datetime.strptime("2024-03-24 10:55:39", "%Y-%m-%d %H:%M:%S") ,89.00]]

In [32]:
#define schema
schema = StructType([
    StructField("Product", StringType(),True),
    StructField("ID",IntegerType(), True),
    StructField("Date",DateType(),True),
    StructField("Timestamp",TimestampType(),True),
    StructField("Price",FloatType(),True)])

In [33]:
df = spark.createDataFrame(data,schema)

In [34]:
df.printSchema()

root
 |-- Product: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Price: float (nullable = true)



In [35]:
df.show()

                                                                                

+---------+----+----------+-------------------+-----+
|  Product|  ID|      Date|          Timestamp|Price|
+---------+----+----------+-------------------+-----+
|Product A|1001|2024-03-20|2024-03-20 10:55:39| 29.9|
|Product B|1002|2024-03-21|2024-03-21 10:55:39| 49.0|
|Product C|1003|2024-03-22|2024-03-22 10:55:39| 20.4|
|Product D|1004|2024-03-23|2024-03-23 10:55:39| 29.8|
|Product E|1005|2024-03-24|2024-03-24 10:55:39| 89.0|
+---------+----+----------+-------------------+-----+



In [42]:
# Read file from hdfs ,should not infer schema,ignore header, providing coulmn name explicit
#Define schema 
schema = StructType([
         StructField("order_id",StringType(),True),
         StructField("order_item_id",IntegerType(),True),
         StructField("product_id",StringType(),True),
         StructField("seller_id",StringType(),True),
         StructField("shipping_limit_date", TimestampType(),True),
         StructField("price",DoubleType(),True), 
         StructField("freight_value",DoubleType(),True)
])

hdfs_path = '/temp/test/order_items_dataset.csv'

df = spark.read.format('csv').option('header','true').option('inferSchema','false').schema(schema).load(hdfs_path)

df.printSchema()

df.show(5)







root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|
+--------------------+-------------+------------

                                                                                

In [55]:
#second read example  should  infer schema 
hdfs_path =  '/temp/test/order_items_dataset.csv'
df2 = spark.read.format('csv').option('header', 'True').option('inferSchema','true').load(hdfs_path)


                                                                                

In [56]:
df2.show(2)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
only showing top 2 rows



In [58]:
#Number of repartition
print(f'Number of repartation :{df2.rdd.getNumPartitions()}')
df3 = df2.repartition(3)

print(f'Number of repartation :{df3.rdd.getNumPartitions()}')


Number of repartation :2




Number of repartation :3


In [63]:
#select columns in diffrent options 
from pyspark.sql.functions import*

df3.select('order_id').show(5)
df3.select('order_id','shipping_limit_date').show(5)
df3.select(col('order_id'),col('shipping_limit_date')).show(5)
df3.select(col('order_id').alias('oid'), col('shipping_limit_date').alias('limit_date')).show(5)


                                                                                

+--------------------+
|            order_id|
+--------------------+
|6299bb8e855289b41...|
|3475576ebe7b1d8da...|
|47399920ee7546351...|
|47399920ee7546351...|
|9209a059ff5ac3476...|
+--------------------+
only showing top 5 rows



                                                                                

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|6076ec81706ee9060...|2018-07-11 08:30:36|
|470a0f13c7dcaed98...|2018-06-26 02:30:30|
|5697c84d8f3e63f51...|2018-04-11 23:10:18|
|8da1b0cac91830c54...|2018-02-21 16:07:37|
|29eaa93d33fc63287...|2018-04-30 12:32:04|
+--------------------+-------------------+
only showing top 5 rows



                                                                                

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|6076ec81706ee9060...|2018-07-11 08:30:36|
|470a0f13c7dcaed98...|2018-06-26 02:30:30|
|5697c84d8f3e63f51...|2018-04-11 23:10:18|
|8da1b0cac91830c54...|2018-02-21 16:07:37|
|29eaa93d33fc63287...|2018-04-30 12:32:04|
+--------------------+-------------------+
only showing top 5 rows





+--------------------+-------------------+
|                 oid|         limit_date|
+--------------------+-------------------+
|6076ec81706ee9060...|2018-07-11 08:30:36|
|470a0f13c7dcaed98...|2018-06-26 02:30:30|
|5697c84d8f3e63f51...|2018-04-11 23:10:18|
|8da1b0cac91830c54...|2018-02-21 16:07:37|
|29eaa93d33fc63287...|2018-04-30 12:32:04|
+--------------------+-------------------+
only showing top 5 rows



                                                                                

In [69]:
#derive new column using withcolumn
df4 = df3.withColumn("year" ,year(col("shipping_limit_date"))).withColumn("month" , month(col("shipping_limit_date")))
df4.select("order_id" ,"shipping_limit_date","year","month").show(5)



+--------------------+-------------------+----+-----+
|            order_id|shipping_limit_date|year|month|
+--------------------+-------------------+----+-----+
|6076ec81706ee9060...|2018-07-11 08:30:36|2018|    7|
|470a0f13c7dcaed98...|2018-06-26 02:30:30|2018|    6|
|5697c84d8f3e63f51...|2018-04-11 23:10:18|2018|    4|
|8da1b0cac91830c54...|2018-02-21 16:07:37|2018|    2|
|29eaa93d33fc63287...|2018-04-30 12:32:04|2018|    4|
+--------------------+-------------------+----+-----+
only showing top 5 rows



                                                                                

In [71]:
#Rename existing column using withcolumnRenamed
df5 = df4.withColumnRenamed("shipping_limit_date","shipping_limit_datetime")
df5.select("order_id" ,"shipping_limit_datetime").show(5)



+--------------------+-----------------------+
|            order_id|shipping_limit_datetime|
+--------------------+-----------------------+
|6076ec81706ee9060...|    2018-07-11 08:30:36|
|470a0f13c7dcaed98...|    2018-06-26 02:30:30|
|5697c84d8f3e63f51...|    2018-04-11 23:10:18|
|8da1b0cac91830c54...|    2018-02-21 16:07:37|
|29eaa93d33fc63287...|    2018-04-30 12:32:04|
+--------------------+-----------------------+
only showing top 5 rows



                                                                                

In [78]:
df5.filter(col("order_id") == '00010242fe8c5a6d1ba2dd792cb16214').show(5)
order_list =['00010242fe8c5a6d1ba2dd792cb16214','00018f77f2f0320c557190d7a144bdd3']
df5.filter(col("order_id").isin(order_list)).show(5)
df.filter((col("price")<50)&(col("freight_value")<10)).show(5)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|    9|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+----

In [80]:
#drop and Duplicates
df5.dropDuplicates(['order_id','order_item_id']).show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|    2017-05-03 11:05:13| 239.9|        19.93|2017|    5|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|    2017-02-13 13:57:51| 199.9|        18.14|2017|    2|
|0005a1a1728c9d785...|            1|310ae3c140ff94b03...|a416b6a846a117243...|    2018-03-26 18:31:29|145.95|        11.65|2018|    3|
|00061f2a7bc09da83...|            1|d63c1011f49d98b97...|cc419e0650a3c5ba7...|    2018-03-29 22:28:09| 59.99|         8.88|2018|    3|
|00063b381e2406b52...|            1|f177554ea93259a5b..

                                                                                

In [82]:
#get distinct rows
df5.distinct().show(5)
df5.dropDuplicates().show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|f280a59cad4cf94ec...|            1|a4aa7c1427c31344e...|66922902710d126a0...|    2017-08-10 16:35:20|110.0|        20.26|2017|    8|
|b0835aaf669bff956...|            1|a6dfe3ac4d3a7c8dc...|17a053fcb14bd2195...|    2018-03-05 08:30:33|199.9|       117.02|2018|    3|
|f68f706f1fd4d5875...|            1|4ce9ab528124f89e0...|7e3f87d16fb353f40...|    2017-12-13 16:51:50| 41.9|         8.72|2017|   12|
|a6ddd2889891733e0...|            2|01084e8138d03dc69...|3092c0b297aacfb4b...|    2018-03-06 12:50:32| 44.9|        11.73|2018|    3|
|fa666f44d0e5c50da...|            2|98354ddeaeae40bd6...|165fc



+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|1fa3133aa187f5523...|            1|1b97621fc0e6ac58c...|dc8798cbf453b7e0f...|    2017-06-07 19:22:56|  24.9|         4.67|2017|    6|
|3fb11bd2ea68c2502...|            1|f908d3bf313a1308b...|25e6ffe976bd75618...|    2017-12-07 13:11:22|  35.0|        11.85|2017|   12|
|1d4e3c08acd5f7023...|            1|87d780fa7d2cf3710...|da8622b14eb17ae28...|    2018-02-23 11:35:40| 109.9|        18.02|2018|    2|
|18ed848509774f56c...|            1|309dd69eb83cea38c...|0b35c634521043bf4...|    2018-02-09 13:30:39| 49.99|         15.1|2018|    2|
|004fb5e6f90a178dc...|            1|601a360bd2a916ece..

                                                                                

In [85]:
#arranging data  using order by
df5.orderBy(col('price').desc()).show(5)
df5.orderBy(col('price').asc(),col('freight_value').desc()).show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|0812eb902a67711a1...|            1|489ae2aa008f02150...|e3b4998c7a498169d...|    2017-02-16 20:37:36|6735.0|       194.31|2017|    2|
|fefacc66af859508b...|            1|69c590f7ffc7bf8db...|80ceebb4ee9b31afb...|    2018-08-02 04:05:13|6729.0|       193.21|2018|    8|
|f5136e38d1a14a4db...|            1|1bdf5e6731585cf01...|ee27a8f15b1dded4d...|    2017-06-15 02:45:17|6499.0|       227.66|2017|    6|
|a96610ab360d42a2e...|            1|a6492cc69376c469a...|59417c56835dd8e2e...|    2017-04-18 13:25:18|4799.0|       151.34|2017|    4|
|199af31afc78c699f...|            1|c3ed642d592594bb6..



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|c5bdd8ef3c0ec4202...|            2|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-07 02:55:22| 0.85|         22.3|2018|    5|
|3ee6513ae7ea23bdf...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-04 03:55:26| 0.85|        18.23|2018|    5|
|6e864b3f0ec710311...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-02 20:30:34| 0.85|        18.23|2018|    5|
|8272b63d03f5f79c5...|           16|270516a3f41dc035a...|2709af9587499e95e...|    2017-07-21 18:25:23|  1.2|         7.89|2017|    7|
|8272b63d03f5f79c5...|            7|05b515fdc76e888aa...|2709a

                                                                                

In [88]:
#group by
df5.groupBy('year').agg(count('*').alias('total_count'),
                        avg('price').alias('avg_price'),
                        sum('price').alias('sum_price'),
                        min('price').alias('min_price'),
                        max('price').alias('max_price')).show(5)



+----+-----------+------------------+-----------------+---------+---------+
|year|total_count|         avg_price|        sum_price|min_price|max_price|
+----+-----------+------------------+-----------------+---------+---------+
|2018|      62511| 120.0851568523827|7506643.239999294|     0.85|   6729.0|
|2020|          4|             86.49|           345.96|    69.99|    99.99|
|2016|        370|134.55654054054037|49785.91999999994|      6.0|   1399.0|
|2017|      49765|121.26732804178076| 6034868.57999922|      1.2|   6735.0|
+----+-----------+------------------+-----------------+---------+---------+



                                                                                

In [99]:
df5.groupBy('year', 'month').agg(count('*').alias('total_count'),
                        avg('price').alias('avg_price'),
                        sum('price').alias('sum_price'),
                        min('price').alias('min_price'),
                        max('price').alias('max_price')).orderBy(col('year').asc(),col('month').asc()).show(5)



+----+-----+-----------+------------------+------------------+---------+---------+
|year|month|total_count|         avg_price|         sum_price|min_price|max_price|
+----+-----+-----------+------------------+------------------+---------+---------+
|2016|    9|          4| 48.61750000000001|194.47000000000003|    44.99|     59.5|
|2016|   10|        365|135.83712328767106| 49580.54999999994|      6.0|   1399.0|
|2016|   12|          1|              10.9|              10.9|     10.9|     10.9|
|2017|    1|        681|117.65747430249657| 80124.74000000017|      2.9|   1999.0|
|2017|    2|       1866| 131.8231564844587|245982.00999999995|      3.9|   6735.0|
+----+-----+-----------+------------------+------------------+---------+---------+
only showing top 5 rows



                                                                                

In [101]:
#fill misssing data with default value
df5.fillna({'price':0,'freight_value':0}).show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|9e4fa2c0dfc591cf6...|            1|84f45695836516442...|4a3ca9315b744ce9f...|    2018-03-05 04:11:39| 92.0|        13.66|2018|    3|
|8925a70c5c2e5b4ce...|            1|54e91dfb43a03717a...|fa1a9dec3a9940c07...|    2017-12-26 15:11:49| 89.9|        35.95|2017|   12|
|a024d7476a847e045...|            1|fbce4c4cb307679d8...|c33847515fa6305ce...|    2018-08-01 03:24:23|149.0|        51.33|2018|    8|
|650d7fd7f019e7381...|            1|cc9c93a7dc6ba4b59...|a3a38f4affed601eb...|    2017-08-03 01:45:14| 89.9|        35.95|2017|    8|
|811155730c44e9b55...|            1|518ef5de2c2b3a255...|28ea4

                                                                                

In [112]:
df5.withColumn("price_category",  when(col('price') >= 100, "High")
                                 .when((col('price') < 100) & (col('price') >= 50), "Medium")
                                 .otherwise("Low")).show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|price_category|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------+
|9e4fa2c0dfc591cf6...|            1|84f45695836516442...|4a3ca9315b744ce9f...|    2018-03-05 04:11:39| 92.0|        13.66|2018|    3|        Medium|
|8925a70c5c2e5b4ce...|            1|54e91dfb43a03717a...|fa1a9dec3a9940c07...|    2017-12-26 15:11:49| 89.9|        35.95|2017|   12|        Medium|
|a024d7476a847e045...|            1|fbce4c4cb307679d8...|c33847515fa6305ce...|    2018-08-01 03:24:23|149.0|        51.33|2018|    8|          High|
|650d7fd7f019e7381...|            1|cc9c93a7dc6ba4b59...|a3a38f4affed601eb...|    2017-08-03 01:45:14| 89.

                                                                                

In [118]:
#window function in
from pyspark.sql.window import Window

window1 = Window.partitionBy('year').orderBy(col('price').asc())
df5.withColumn('dense_rank',dense_rank().over(window1 )).show(5)

[Stage 121:>                                                        (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|dense_rank|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------+
|3ee6513ae7ea23bdf...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-04 03:55:26| 0.85|        18.23|2018|    5|         1|
|c5bdd8ef3c0ec4202...|            2|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-07 02:55:22| 0.85|         22.3|2018|    5|         1|
|6e864b3f0ec710311...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-02 20:30:34| 0.85|        18.23|2018|    5|         1|
|f1d5c2e6867fa93ce...|            1|46fce52cef5caa7cc...|2d2322d8421188677...|    2018-08-28 21:30:15|  2.2|         7.39|2018|   

                                                                                

In [124]:
#second file read
hdfs_path = '/temp/test/sellers_dataset.csv'
sdf = spark.read.format('csv').option('header','True').option('inferSchema','false').load(hdfs_path)
sdf.show(5)

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                 04195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
+--------------------+----------------------+-----------------+------------+
only showing top 5 rows



In [126]:
#join
results1 = df5.join(broadcast(sdf),df5.seller_id==sdf.seller_id,'inner').drop(sdf.seller_id)
results1.show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------------------+-----------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|seller_zip_code_prefix|seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------------------+-----------+------------+
|9e4fa2c0dfc591cf6...|            1|84f45695836516442...|4a3ca9315b744ce9f...|    2018-03-05 04:11:39| 92.0|        13.66|2018|    3|                 14940|   ibitinga|          SP|
|8925a70c5c2e5b4ce...|            1|54e91dfb43a03717a...|fa1a9dec3a9940c07...|    2017-12-26 15:11:49| 89.9|        35.95|2017|   12|                 88820|      icara|          SC|
|a024d7476a847e045...|            1|fbce4c4cb307679d8...|c33847515fa6305ce...|    2018-08-

In [128]:
#spark sql
df5.createOrReplaceTempView("ORDER_ITEM")
df5.createOrReplaceTempView("SELLERS")
joinDF2=spark.sql("select * from order_item oid inner join sellers sid on oid.seller_id== sid.seller_id")

In [133]:
# write data to HDFS without any Partition key 
results1.write.format('csv').option('header','true').option('delimiter',',').save('/tmp/output_data/resultfinal/')
print("write successful")



write successful


                                                                                

In [135]:
results1.write.partitionBy('year').format('csv').option('header', 'true').option('delimiter', ',').save('/tmp/output_data/resultfinal1/')
print("Write Successfull")



Write Successfull


                                                                                