In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [2]:
spark = SparkSession.builder.appName("test").config("spark.driver.memory", "40g").getOrCreate()

In [3]:
df = spark.read.option("header", True).csv("wfp_food_prices_ecu.csv")

In [4]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- admin1: string (nullable = true)
 |-- admin2: string (nullable = true)
 |-- market: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- category: string (nullable = true)
 |-- commodity: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- priceflag: string (nullable = true)
 |-- pricetype: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- price: string (nullable = true)
 |-- usdprice: string (nullable = true)



In [5]:
csv_schema = StructType(fields = [StructField ("date", StringType(), True ),
                                  StructField ("admin1", StringType(), True ),
                                  StructField ("admin2", StringType(), True ),
                                  StructField ("market", StringType(), True ),
                                  StructField ("latitude", StringType(), True ),
                                  StructField ("longitude", StringType(), True ),
                                  StructField ("category", StringType(), True ),
                                  StructField ("commodity", StringType(), True ),
                                  StructField ("unit", StringType(), True ),
                                  StructField ("priceflag", StringType(), True ),
                                  StructField ("pricetype", StringType(), True ),
                                  StructField ("currency", StringType(), True ),
                                  StructField ("price", StringType(), True ),
                                  StructField ("usdprice", StringType(), True ),
                                 ])

In [6]:
df = spark.read.option("header", True)\
.schema(csv_schema)\
.csv("wfp_food_prices_ecu.csv")

In [7]:
#select only required columns
#style-1
selected_df = df.select("date","category","unit","currency","price")

In [8]:
selected_df.show(5)

+----------+------------------+----------+---------+------+
|      date|          category|      unit| currency| price|
+----------+------------------+----------+---------+------+
|     #date|        #item+type|#item+unit|#currency|#value|
|2005-01-15|cereals and tubers|        KG|      USD|  0.23|
|2005-01-15|cereals and tubers|        KG|      USD|  0.64|
|2005-01-15|cereals and tubers|        KG|      USD|  0.22|
|2005-01-15|cereals and tubers|        KG|      USD|  0.51|
+----------+------------------+----------+---------+------+
only showing top 5 rows



In [9]:
#style-2
selected_df = df.select(df.date,df.category,df.unit,df.currency,df.price)

In [10]:
selected_df.show(5)

+----------+------------------+----------+---------+------+
|      date|          category|      unit| currency| price|
+----------+------------------+----------+---------+------+
|     #date|        #item+type|#item+unit|#currency|#value|
|2005-01-15|cereals and tubers|        KG|      USD|  0.23|
|2005-01-15|cereals and tubers|        KG|      USD|  0.64|
|2005-01-15|cereals and tubers|        KG|      USD|  0.22|
|2005-01-15|cereals and tubers|        KG|      USD|  0.51|
+----------+------------------+----------+---------+------+
only showing top 5 rows



In [11]:
#style-3
from pyspark.sql.functions import col


In [12]:
#can rename as well
selected_df = df.select(col("date"), col("category"),col("unit"),col("currency"), col("price").alias("usd_price"))

In [13]:
#style-4
selected_df = df.select(df["date"],df["category"],df["unit"],df["currency"],df["price"])

In [14]:
#rename column using withColumnRenamed
renamed_df = df.withColumnRenamed("category","type")\
.withColumnRenamed("price","npr")

In [17]:
from pyspark.sql.functions import current_timestamp,lit
#add new column
final_df = selected_df.withColumn("ingestion_date", current_timestamp())\
.withColumn("env", lit("Production"))

In [20]:
final_df.show(5, False) #truncate = False

+----------+------------------+----------+---------+------+--------------------------+----------+
|date      |category          |unit      |currency |price |ingestion_date            |env       |
+----------+------------------+----------+---------+------+--------------------------+----------+
|#date     |#item+type        |#item+unit|#currency|#value|2024-01-24 09:32:42.720029|Production|
|2005-01-15|cereals and tubers|KG        |USD      |0.23  |2024-01-24 09:32:42.720029|Production|
|2005-01-15|cereals and tubers|KG        |USD      |0.64  |2024-01-24 09:32:42.720029|Production|
|2005-01-15|cereals and tubers|KG        |USD      |0.22  |2024-01-24 09:32:42.720029|Production|
|2005-01-15|cereals and tubers|KG        |USD      |0.51  |2024-01-24 09:32:42.720029|Production|
+----------+------------------+----------+---------+------+--------------------------+----------+
only showing top 5 rows



In [23]:
#write dataframe
#write in parquet format
final_df.write.mode("overwrite").parquet("C:/Users/bibhusha.ojha_genese/Desktop/SparkSQL/datalake")

In [24]:
df = spark.read.parquet("C:/Users/bibhusha.ojha_genese/Desktop/SparkSQL/datalake")

In [26]:
df.show(5, False)

+----------+------------------+----------+---------+------+--------------------------+----------+
|date      |category          |unit      |currency |price |ingestion_date            |env       |
+----------+------------------+----------+---------+------+--------------------------+----------+
|#date     |#item+type        |#item+unit|#currency|#value|2024-01-24 09:37:40.092338|Production|
|2005-01-15|cereals and tubers|KG        |USD      |0.23  |2024-01-24 09:37:40.092338|Production|
|2005-01-15|cereals and tubers|KG        |USD      |0.64  |2024-01-24 09:37:40.092338|Production|
|2005-01-15|cereals and tubers|KG        |USD      |0.22  |2024-01-24 09:37:40.092338|Production|
|2005-01-15|cereals and tubers|KG        |USD      |0.51  |2024-01-24 09:37:40.092338|Production|
+----------+------------------+----------+---------+------+--------------------------+----------+
only showing top 5 rows



In [27]:
#partition
final_df.write.mode("overwrite").partitionBy("date").parquet("C:/Users/bibhusha.ojha_genese/Desktop/SparkSQL/datalake")