## Simple ETL pipeline in Databricks 

### Extract data from local : E of the ETL pipeline

In [0]:

# Define the file path (can be DBFS, S3, etc.)
file_path = "/FileStore/tables/forex.csv"

# Reading CSV file into a DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

# Show the DataFrame
display(df.limit(10))


slug,date,open,high,low,close,currency
GBP/EGP,2001-04-10,5.5809,5.5947,5.5947,5.5947,EGP
GBP/EGP,2001-06-04,5.47517,5.4939,5.4939,5.4939,EGP
GBP/EGP,2001-08-01,5.6799,5.6543,5.6543,5.6543,EGP
GBP/EGP,2002-07-29,7.217,7.217,7.217,7.217,EGP
GBP/EGP,2003-01-02,7.42429,7.3899,7.3899,7.3899,EGP
GBP/EGP,2003-04-21,9.29241,9.25045,9.25045,9.25045,EGP
GBP/EGP,2004-02-13,11.607,11.642,11.52,11.578,EGP
GBP/EGP,2004-02-16,11.571,11.623,11.499,11.607,EGP
GBP/EGP,2004-02-17,11.608,11.739,11.521,11.662,EGP
GBP/EGP,2004-02-18,11.677,11.772,11.631,11.691,EGP


#### Some checkings I gotta do

##### Check datatypes

In [0]:
print(df.dtypes)

[('slug', 'string'), ('date', 'date'), ('open', 'double'), ('high', 'double'), ('low', 'double'), ('close', 'double'), ('currency', 'string')]


##### Find the max and min date

In [0]:
df.createOrReplaceTempView('date_table')

# Run the SQL query
res = spark.sql("""
    SELECT YEAR(date) as year FROM date_table
""")
res.show(5)

+----+
|year|
+----+
|2001|
|2001|
|2001|
|2002|
|2003|
+----+
only showing top 5 rows



### Transforming the data : T of the ETL pipeline

#### Transformation # 1: Selecting only the last 5 years values

In [0]:
from pyspark.sql import functions as f

df.createOrReplaceTempView("newtable")

res1 = spark.sql("SELECT * FROM newtable WHERE YEAR(date) BETWEEN 2016 AND 2021")
display(res1)


slug,date,open,high,low,close,currency
GBP/EGP,2016-01-01,11.371,11.503,11.371,11.371,EGP
GBP/EGP,2016-01-04,11.376,11.562,11.371,11.373,EGP
GBP/EGP,2016-01-05,11.49,11.505,11.429,11.49,EGP
GBP/EGP,2016-01-06,11.458,11.461,11.399,11.453,EGP
GBP/EGP,2016-01-07,11.419,11.434,11.348,11.42,EGP
GBP/EGP,2016-01-08,11.414,11.432,11.342,11.416,EGP
GBP/EGP,2016-01-11,11.105,11.398,11.098,11.102,EGP
GBP/EGP,2016-01-12,11.353,11.372,11.204,11.351,EGP
GBP/EGP,2016-01-13,11.274,11.311,11.233,11.273,EGP
GBP/EGP,2016-01-14,11.247,11.271,11.208,11.245,EGP


#### Transformation # 2: Finding out the day of the week for each closing stock

In [0]:
res1 = df.withColumn("weekday", f.date_format("date", "EEEE"))
display(res1)

slug,date,open,high,low,close,currency,weekday
GBP/EGP,2001-04-10,5.5809,5.5947,5.5947,5.5947,EGP,Tuesday
GBP/EGP,2001-06-04,5.47517,5.4939,5.4939,5.4939,EGP,Monday
GBP/EGP,2001-08-01,5.6799,5.6543,5.6543,5.6543,EGP,Wednesday
GBP/EGP,2002-07-29,7.217,7.217,7.217,7.217,EGP,Monday
GBP/EGP,2003-01-02,7.42429,7.3899,7.3899,7.3899,EGP,Thursday
GBP/EGP,2003-04-21,9.29241,9.25045,9.25045,9.25045,EGP,Monday
GBP/EGP,2004-02-13,11.607,11.642,11.52,11.578,EGP,Friday
GBP/EGP,2004-02-16,11.571,11.623,11.499,11.607,EGP,Monday
GBP/EGP,2004-02-17,11.608,11.739,11.521,11.662,EGP,Tuesday
GBP/EGP,2004-02-18,11.677,11.772,11.631,11.691,EGP,Wednesday


In [0]:
# Calculate 7-day moving average for 'close' price
from pyspark.sql.window import Window

window_spec = Window.partitionBy("slug").orderBy("date").rowsBetween(-6, 0)  # 7-day window
res1 = res1.withColumn("7 day average", f.round(f.avg("close").over(window_spec), 3))

display(res1)

slug,date,open,high,low,close,currency,weekday,7 day average
AUD/EUR,2003-12-01,0.60121,0.60827,0.60121,0.60798,EUR,Monday,0.608
AUD/EUR,2003-12-02,0.60772,0.60901,0.60379,0.60551,EUR,Tuesday,0.607
AUD/EUR,2003-12-03,0.60558,0.60887,0.60459,0.60764,EUR,Wednesday,0.607
AUD/EUR,2003-12-04,0.60779,0.61002,0.60588,0.60838,EUR,Thursday,0.607
AUD/EUR,2003-12-05,0.60853,0.60872,0.60295,0.60419,EUR,Friday,0.607
AUD/EUR,2003-12-08,0.60441,0.60779,0.60441,0.60544,EUR,Monday,0.607
AUD/EUR,2003-12-09,0.60529,0.60584,0.6035,0.60397,EUR,Tuesday,0.606
AUD/EUR,2003-12-10,0.60386,0.60687,0.60292,0.6054,EUR,Wednesday,0.606
AUD/EUR,2003-12-11,0.60569,0.60624,0.60299,0.60489,EUR,Thursday,0.606
AUD/EUR,2003-12-12,0.60492,0.60661,0.60245,0.60361,EUR,Friday,0.605


### Loading / Writing to the Filestore : L of the ETL

In [0]:
df.write.format('csv').option('header','true').save('/FileStore/tables/newforex.csv')