In [5]:
# This practice project focuses on data transformation and integration using PySpark

# Setting up Pyspark

In [7]:
# installing required packages
!pip install wget pyspark findspark



In [1]:
import findspark

findspark.init() # initiates a spark session 

In [3]:
# PySpark is the Spark API for Python we use PySpark to initialize the SparkContext.   

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [5]:
# Creating a SparkContext object

sc = SparkContext.getOrCreate()

# Creating a Spark Session

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
# download datasets
import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
wget.download(link_to_data1)

link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_to_data2)

'dataset2 (1).csv'

In [9]:
#load the data into a pyspark dataframe
    
df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)

In [10]:
#print the schema of df1 and df2
    
df1.printSchema()
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)



# Performing some transformations on data

In [17]:
from pyspark.sql.functions import year, quarter, to_date

# Add new column 'year' to df1
df1 = df1.withColumn('year', year(to_date('date_column', 'dd/MM/yyyy')))

# Add new column 'quarter' to df2    
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date', 'dd/MM/yyyy')))

# this adds new date columns with the proper spark format


In [33]:
# Set Spark to use the legacy time parser
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Now this should work
df1.show()
df2.show()
# view the data so far

+-----------+-----------+------------------+----+
|customer_id|date_column|transaction_amount|year|
+-----------+-----------+------------------+----+
|          1|   1/1/2022|              5000|2022|
|          2|  15/2/2022|              1200|2022|
|          3|  20/3/2022|               800|2022|
|          4|  10/4/2022|              3000|2022|
|          5|   5/5/2022|              6000|2022|
|          6|  10/6/2022|              4500|2022|
|          7|  15/7/2022|               200|2022|
|          8|  20/8/2022|              3500|2022|
|          9|  25/9/2022|               700|2022|
|         10| 30/10/2022|              1800|2022|
|         11|  5/11/2022|              2200|2022|
|         12| 10/12/2022|               900|2022|
|         13|  15/1/2023|              4800|2023|
|         14|  20/2/2023|               300|2023|
|         15|  25/3/2023|              4200|2023|
|         16|  30/4/2023|              2600|2023|
|         17|   5/5/2023|               700|2023|


In [29]:
# Renaming columns in both dataframes
df1 = df1.withColumnRenamed('amount', 'transaction_amount')
# Renaming columns in both dataframes
df2 = df2.withColumnRenamed('value', 'transaction_value')

In [31]:
# dropping columns description and loction from df1
df1 = df1.drop('description', 'location')

# dropping notes from df2
df2 = df2.drop('notes')

In [35]:
# join dataframes based on common collumn customer_id
joined_df = df1.join(df2, 'customer_id', 'inner')

In [37]:
joined_df.show()

+-----------+-----------+------------------+----+----------------+-----------------+-------+
|customer_id|date_column|transaction_amount|year|transaction_date|transaction_value|quarter|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
|          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|      1|
|          2|  15/2/2022|              1200|2022|       15/2/2022|             2000|      1|
|          3|  20/3/2022|               800|2022|       20/3/2022|             1000|      1|
|          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|      2|
|          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|      2|
|          6|  10/6/2022|              4500|2022|       10/6/2022|             1200|      2|
|          7|  15/7/2022|               200|2022|       15/7/2022|              700|      3|
|          8|  20/8/2022|              3500|2022|       20/8/2022|    

In [47]:
# Filtering data based on a condition 
# Foe example filtering joinedf for only transactions where amount is greater than 1000
filtered_df = joined_df.filter('transaction_amount > 1000')

In [41]:
filtered_df.show()

+-----------+-----------+------------------+----+----------------+-----------------+-------+
|customer_id|date_column|transaction_amount|year|transaction_date|transaction_value|quarter|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
|          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|      1|
|          2|  15/2/2022|              1200|2022|       15/2/2022|             2000|      1|
|          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|      2|
|          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|      2|
|          6|  10/6/2022|              4500|2022|       10/6/2022|             1200|      2|
|          8|  20/8/2022|              3500|2022|       20/8/2022|             3000|      3|
|         10| 30/10/2022|              1800|2022|      30/10/2022|             1200|      4|
|         11|  5/11/2022|              2200|2022|       5/11/2022|    

In [49]:
# Aggregate data by customer 
from pyspark.sql.functions import sum 
# grouping by customer id and aggregating the sum of transaction amount 

total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))

In [51]:
total_amount_per_customer.show()

+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|         31|        3200|
|         85|        1800|
|         78|        1500|
|         34|        1200|
|         81|        5500|
|         28|        2600|
|         76|        2600|
|         27|        4200|
|         91|        3200|
|         22|        1200|
|         93|        5500|
|          1|        5000|
|         52|        2600|
|         13|        4800|
|          6|        4500|
|         16|        2600|
|         40|        2600|
|         94|        1200|
|         57|        5500|
|         54|        1500|
+-----------+------------+
only showing top 20 rows



In [53]:
# Writing the Result to a Hive table 
total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")

Py4JJavaError: An error occurred while calling o90.saveAsTable.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.liftedTree1$1(InMemoryCatalog.scala:122)
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.createDatabase(InMemoryCatalog.scala:119)
	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:159)
	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:140)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$catalog$1(BaseSessionStateBuilder.scala:154)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog$lzycompute(SessionCatalog.scala:123)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog(SessionCatalog.scala:123)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:520)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:652)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:582)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:377)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:969)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:199)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:222)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1125)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1134)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more
