In [1]:
import requests
import datetime
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

## API Base URL

`https://www.nrb.org.np/api/forex/v1/`

## Endpoints

### `GET /rates`

Returns Foreign Exchange Rates for a given range.

#### Parameters

- **page**: Current Page
- **per_page**: Number of items to show per page
- **from**: Starting date (In Y-m-d format)
- **to**: Ending date (In Y-m-d format)

#### Responses Description

- **status**
  - **code**: Status Code
    - `200`: OK
    - `400`: Bad Request / Invalid Arguments

- **errors**
  - **validation**
    - **per_page**
      - "Per Page is required"
      - "Per Page must be an integer"
      - "Per Page must be at least 1"
      - "Per Page must be no more than 100"
    - **page**
      - "Page is required"
      - "Page must be an integer"
    - **from**
      - "From is required"
      - "From must be date with format 'Y-m-d'"
    - **to**
      - "To is required"

- **params**: GET Parameters of the API endpoint
  - **per_page**
  - **page**
  - **from**
  - **to**

- **data**
  - **payload**: Contains an array of Foreign Exchange rates of different dates
    - **date**: FOREX Rates for this date
    - **published_on**: FOREX Rates publish date
    - **modified_on**: FOREX Rates last modified date
    - **rates**: Array of rates
      - **currency**
        - **unit**
        - **name**
        - **ISO3**
        - **buy**: Currency Buying Rate (in NRs.)
        - **sell**: Currency Selling Rate (in NRs.)

- **pagination**
  - **page**: Current Page (Cursor)
  - **pages**: Total number of pages
  - **per_page**: Items per page
  - **total**: Total number of items
  - **links**
    - **prev**: Link for prev


<h1>Fetch Total No of Pages for given Date Window</h1>

In [2]:
def fetchTotalPages(start_date, end_date):
    url = "https://www.nrb.org.np/api/forex/v1/rates"
    params = {
        'from': start_date,
        'to': end_date,
        'per_page': 100,
        'page': 1
    }
    response = requests.get(url, params=params)
    if response.status_code == 200:
        data = response.json()
        if data.get('status', {}).get('code') == 200:
            return data.get('pagination', {}).get('pages', 1)
    return 1

print(fetchTotalPages("2014-07-23","2024-07-23"))

37


<h1>Fetch Data Using Api</h1>

In [3]:
def fetchData(start_date, end_date):
    url = "https://www.nrb.org.np/api/forex/v1/rates"
    per_page = 100
    total_page = fetchTotalPages(start_date, end_date)
    # print(f"Total pages: {total_page}")  # Debugging
    page = 1
    all_records = []
    
    while page <= total_page:
        params = {
            'from': start_date,
            'to': end_date,
            'per_page': per_page,
            'page': page
        }
        response = requests.get(url, params=params)
        data = response.json()  # get the response in json format
        # print(f"Response for page {page}: {data}")  # Debugging

        if data.get('status', {}).get('code') == 200:
            for entry in data.get('data', {}).get('payload', []):
                date = entry.get('date')
                for rate in entry.get('rates', []): 
                    record = {
                        'date': date,
                        'Name': rate.get('currency', {}).get('name'),
                        'ISO3': rate.get('currency', {}).get('iso3'),
                        'Unit': rate.get('currency', {}).get('unit'),
                        'BuyRate': rate.get('buy'),
                        'SellRate': rate.get('sell')
                    }
                    all_records.append(record)
        else:
            print(f"Error response code: {data.get('status', {}).get('code')}")
            break

        page += 1

    return all_records

# data=fetchData(start_date,end_date)
# print(data)

<h1>Convert Response to Spark Dataframe</h1>

In [4]:
def createDataframe(spark,data):
    schema = StructType([
        # StructField("Id", StringType(), True), 
        StructField("date", StringType(), True),
        StructField("Unit", StringType(), True),
        StructField("Name", StringType(), True),
        StructField("ISO3", StringType(), True),
        StructField("BuyRate", StringType(), True),
        StructField("SellRate", StringType(), True)
    ])

    df=spark.createDataFrame(data,schema=schema)

    df = df.withColumn("SOURCE", lit("NRB"))
    # df = df.withColumn("date", to_date(df["date"], "yyyy-MM-dd"))
    df = df.withColumnRenamed("date", "RateDate")

    # df = df.withColumn("RateDate", to_date(col("date"), "yyyy-MM-dd"))
    # df = df.drop("date")

    df = df.withColumn("BuyRate",df["BuyRate"].cast(FloatType()))
    df = df.withColumn("SellRate",df["SellRate"].cast(FloatType()))

    window_spec = Window.orderBy("RateDate")# Order the DataFrame by Date and add an incremental ID
    df = df.withColumn("ID", row_number().over(window_spec).cast(IntegerType())) # Add an incremental ID column

    df = df.withColumnRenamed("Name", "CurrencyName")
    df = df.withColumnRenamed("ISO3", "CurrencyCode")


    final_df = df.select(
    "ID",
    "RateDate",
    "CurrencyCode",
    "CurrencyName",
    "Unit",
    "BuyRate",
    "SellRate",
    "SOURCE"
    )

    return final_df

<h1>10 Years Window</h1>

In [5]:
start_date = (datetime.datetime.now() - datetime.timedelta(days=3653)).strftime("%Y-%m-%d")  # Approximate 10 years from now
end_date = datetime.datetime.now().strftime("%Y-%m-%d")

<h1>SparkSession</h1>

In [6]:
spark=SparkSession.builder \
.appName("Forex Data ETL") \
.master("local[*]") \
.config("spark.jars", "/usr/share/java/mysql-connector-j-9.0.0.jar") \
.getOrCreate()

Picked up _JAVA_OPTIONS: -Xmx2G -XX:+UseG1GC
Picked up _JAVA_OPTIONS: -Xmx2G -XX:+UseG1GC
24/07/23 14:43:17 WARN Utils: Your hostname, Nitro resolves to a loopback address: 127.0.1.1; using 10.13.164.84 instead (on interface wlp0s20f3)
24/07/23 14:43:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/07/23 14:43:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


<h1>Extract Data</h1>

In [7]:
data=fetchData(start_date,end_date)
# print(data)

<h1>Transform</h1>

In [8]:
df=createDataframe(spark,data)
df.show(truncate=False)

24/07/23 14:44:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/23 14:44:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/23 14:44:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+---+----------+------------+-------------------+----+-------+--------+------+
|ID |RateDate  |CurrencyCode|CurrencyName       |Unit|BuyRate|SellRate|SOURCE|
+---+----------+------------+-------------------+----+-------+--------+------+
|1  |2014-07-23|GBP         |UK Pound Sterling  |1   |163.89 |165.0   |NRB   |
|2  |2014-07-23|BHD         |Bahrain Dinar      |1   |254.89 |0.0     |NRB   |
|3  |2014-07-23|INR         |Indian Rupee       |100 |160.0  |160.15  |NRB   |
|4  |2014-07-23|USD         |U.S. Dollar        |1   |96.09  |96.69   |NRB   |
|5  |2014-07-23|EUR         |European Euro      |1   |129.59 |130.4   |NRB   |
|6  |2014-07-23|CHF         |Swiss Franc        |1   |106.64 |107.3   |NRB   |
|7  |2014-07-23|AUD         |Australian Dollar  |1   |90.21  |90.77   |NRB   |
|8  |2014-07-23|CAD         |Canadian Dollar    |1   |89.39  |89.95   |NRB   |
|9  |2014-07-23|SGD         |Singapore Dollar   |1   |77.45  |77.93   |NRB   |
|10 |2014-07-23|JPY         |Japanese Yen       |10 

In [9]:
df = df.withColumn("RateDate", to_date(df["RateDate"], "yyyy-MM-dd")) \
    .withColumn("RateDate", date_format("RateDate", "MM/dd/yyyy"))
df.show(df.count(),truncate=False)

24/07/23 14:45:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/23 14:45:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/23 14:45:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+-----+----------+------------+-------------------+----+-------+--------+------+
|ID   |RateDate  |CurrencyCode|CurrencyName       |Unit|BuyRate|SellRate|SOURCE|
+-----+----------+------------+-------------------+----+-------+--------+------+
|1    |07/23/2014|INR         |Indian Rupee       |100 |160.0  |160.15  |NRB   |
|2    |07/23/2014|USD         |U.S. Dollar        |1   |96.09  |96.69   |NRB   |
|3    |07/23/2014|EUR         |European Euro      |1   |129.59 |130.4   |NRB   |
|4    |07/23/2014|GBP         |UK Pound Sterling  |1   |163.89 |165.0   |NRB   |
|5    |07/23/2014|CHF         |Swiss Franc        |1   |106.64 |107.3   |NRB   |
|6    |07/23/2014|AUD         |Australian Dollar  |1   |90.21  |90.77   |NRB   |
|7    |07/23/2014|CAD         |Canadian Dollar    |1   |89.39  |89.95   |NRB   |
|8    |07/23/2014|SGD         |Singapore Dollar   |1   |77.45  |77.93   |NRB   |
|9    |07/23/2014|JPY         |Japanese Yen       |10  |9.46   |9.52    |NRB   |
|10   |07/23/2014|CNY       

<h1>Dump csv</h2>

In [None]:
# spark.conf.set("spark.hadoop.fs.defaultFS", "file:///")
# spark.sparkContext.setLogLevel("DEBUG")

In [12]:
repartitioned_df=df.repartition(1)
try:
    # repartitioned_df.write.mode("overwrite").option("header", "true").csv("/home/avyuthan-shah/Desktop/F1Intern/Task3/forex.csv")
    repartitioned_df.write \
    .option("header","true") \
    .option("sep",",") \
    .mode("overwrite") \
    .csv("/home/avyuthan-shah/Desktop/F1Intern/Task3/forex")

except Exception as e:
    print(f"Error writing DataFrame: {e}")


24/07/23 14:47:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/23 14:47:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/23 14:47:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/23 14:47:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 15:>                                                         (0 + 1) / 1]

Error writing DataFrame: An error occurred while calling o112.csv.
: org.apache.hadoop.ipc.RpcException: RPC response has invalid length
	at org.apache.hadoop.ipc.Client$IpcStreams.readResponse(Client.java:1933)
	at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1238)
	at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1134)



<h1>Load Into Sql</h1>

In [11]:
with open("./sql.json") as f:
    j = json.load(f)
host=j['host']
db=j['database']
uName = j['User']
passW = j['Password']

table_name="forex10"


jdbc_url = f"jdbc:mysql://{host}:3306/{db}?useSSL=false"


jdbc_properties = {
    "user": uName,
    "password": passW,
    "driver": "com.mysql.cj.jdbc.Driver"
}

df.write.mode("overwrite").jdbc(jdbc_url, table_name, properties=jdbc_properties)

24/07/23 14:33:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/23 14:33:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/23 14:33:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/23 14:33:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/07/23 14:33:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [None]:
# import mysql.connector
# from mysql.connector import errorcode
# try:

#   conn = mysql.connector.connect(
#       host=host,
#       user=uName,
#       password=passW,
#       database=db
#   )
#   cursor = conn.cursor()
  
#   create_table_query = f"""
#   CREATE TABLE IF NOT EXISTS {table_name} (
#       ID BIGINT AUTO_INCREMENT PRIMARY KEY,
#       RateDate DATE,
#       CurrencyCode VARCHAR(10),
#       CurrencyName VARCHAR(50),
#       Unit INT,
#       BuyRate FLOAT,
#       SellRate FLOAT,
#       SOURCE VARCHAR(10) DEFAULT 'NRB'
#   )
#   """
#   cursor.execute(create_table_query)
  
#   # Insert DataFrame into MySQL table
#   insert_query = f"""
#   INSERT INTO {table_name} (RateDate, CurrencyCode, CurrencyName, Unit, BuyRate, SellRate, SOURCE)
#   VALUES (%s, %s, %s, %s, %s, %s, %s)
#   """
  
#   for i, row in df.iterrows():
#       cursor.execute(insert_query, (
#           row['RateDate'],
#           row['CurrencyCode'],
#           row['CountryName'],
#           row['Unit'],
#           row['BuyRate'],
#           row['SellRate'],
#           row['SOURCE']
#       ))
  
#   conn.commit()
  
#   print("Data uploaded successfully.")
        
# except mysql.connector.Error as err:
#     print(f"Error: {err}")
# finally:
#     cursor.close()
#     conn.close()
print("Forex 10 years ETL Complete")

In [13]:
spark.stop()