# **Connecting Drive to Colab**

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## **Apache Beam for playing around with Bank Nifty Stock Data**

### **Creating pipeline to:**
Calculate the monthly average closing price.

Calculate the daily volatility.

In [11]:
import apache_beam as beam
from datetime import datetime

def extract_month_year(date_str):
    """Extract month and year from the date string, accounting for double quotes."""
    cleaned_date_str = date_str.replace('"', '').strip()
    date_obj = datetime.strptime(cleaned_date_str, '%m/%d/%Y')
    return f"{date_obj.month}-{date_obj.year}"

def calculate_volatility(row):
    """Calculate volatility for the given row, accounting for double quotes."""
    high = float(row['High'].replace(',', '').replace('"', ''))
    low = float(row['Low'].replace(',', '').replace('"', ''))
    return high - low

def run_pipeline():
    # Define the columns inside the function to avoid referencing external variables
    columns = ["Date", "Price", "Open", "High", "Low", "Vol.", "Change %"]

    with beam.Pipeline() as p:
        rows = (
            p | "ReadFromCSV" >> beam.io.ReadFromText('/content/drive/MyDrive/Data_Mining_CMPE_255/Nifty_Bank_Data.csv', skip_header_lines=1)
              | "ParseCSV" >> beam.Map(lambda line: dict(zip(columns, line.split(','))))
        )

        # Calculate monthly average closing price
        monthly_avg = (
            rows | "ExtractMonthYear" >> beam.Map(lambda row: (extract_month_year(row['Date']), float(row['Price'].replace(',', '').replace('"', '').strip())))
                 | "GroupByMonthYear" >> beam.GroupByKey()
                 | "ComputeAveragePrice" >> beam.Map(lambda kv: (kv[0], sum(kv[1]) / len(kv[1])))
                 | "WriteMonthlyAverage" >> beam.io.WriteToText('/content/drive/MyDrive/Data_Mining_CMPE_255/apache_monthly_avg.txt')
        )

        # Calculate daily volatility
        volatility = (
            rows | "CalculateVolatility" >> beam.Map(lambda row: (row['Date'], calculate_volatility(row)))
                 | "WriteVolatility" >> beam.io.WriteToText('/content/drive/MyDrive/Data_Mining_CMPE_255/apache_volatility.txt')
        )

run_pipeline()




The functions have been succesfully executed and

results for monthly average closing price is saved in apache_monthly_avg.txt

results for monthly  daily volatility is saved in apache_volatility.txt


### **Composite Transform:**

the sequence of reading from the CSV, parsing the rows, extracting the prices, and computing the monthly averages can be viewed as a composite transform.

The results of the consolidated Transform are saved in

In [14]:
# Define the columns to avoid referencing external variables
columns = ["Date", "Price", "Open", "High", "Low", "Vol.", "Change %"]

class MonthlyAverageComposite(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "ExtractMonthYear" >> beam.Map(lambda row: (extract_month_year(row['Date']), float(row['Price'].replace(',', '').replace('"', '').strip())))
            | "GroupByMonthYear" >> beam.GroupByKey()
            | "ComputeAveragePrice" >> beam.Map(lambda kv: (kv[0], sum(kv[1]) / len(kv[1])))
        )

class DailyVolatilityComposite(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "CalculateVolatility" >> beam.Map(lambda row: (row['Date'], calculate_volatility(row)))
        )

with beam.Pipeline() as p:
    rows = (
        p | "ReadFromCSV" >> beam.io.ReadFromText('/content/drive/MyDrive/Data_Mining_CMPE_255/Nifty_Bank_Data.csv', skip_header_lines=1)
          | "ParseCSV" >> beam.Map(lambda line: dict(zip(columns, line.split(','))))
    )

    # Using the MonthlyAverageComposite transform
    monthly_avg = (
        rows | "MonthlyAverage" >> MonthlyAverageComposite()
             | "WriteMonthlyAverage" >> beam.io.WriteToText('/content/drive/MyDrive/Data_Mining_CMPE_255/composite_apache_monthly_avg.txt')
    )

    # Using the DailyVolatilityComposite transform
    volatility = (
        rows | "DailyVolatility" >> DailyVolatilityComposite()
             | "WriteVolatility" >> beam.io.WriteToText('/content/drive/MyDrive/Data_Mining_CMPE_255/composite_apache_volatility.txt')
    )

### **Triggers:**
In our pipeline, we used AfterCount(10), which means the trigger fires after 10 elements have been processed in a window. This is just a demonstration; in a real streaming scenario, different triggers might be more appropriate.

Note: This trigger was flagged as potentially unsafe due to the possibility of data loss, so we added the --allow_unsafe_triggers flag.

### **Pardo:**
 We used ParDo in conjunction with the ProcessWindow class to compute the average price for each window of data. The ProcessWindow class, which is a subclass of DoFn, defines the logic to process each window and compute the average price.

### **Windowing**
we implemented custom calendar-based windowing logic to group data by month. Instead of using predefined fixed or sliding windows, we used the start of each month as a key for grouping. Each "window" in this case corresponds to a month, and the GroupByKey transform effectively creates these monthly windows.

In [None]:
# Install Apache Beam in Colab
!pip install apache-beam

# Import necessary libraries
import apache_beam as beam
from apache_beam.transforms.core import ParDo, DoFn
from datetime import datetime

# Define the helper functions and classes
def extract_month_year(date_str):
    cleaned_date_str = date_str.replace('"', '').strip()
    date_obj = datetime.strptime(cleaned_date_str, '%m/%d/%Y')
    return f"{date_obj.month}-{date_obj.year}"

def month_start_key_str(date_str):
    """Extract the start of the month as a key for grouping and return as string."""
    cleaned_date_str = date_str.replace('"', '').strip()
    date_obj = datetime.strptime(cleaned_date_str, '%m/%d/%Y')
    return date_obj.replace(day=1).strftime('%Y-%m-%d')

class ProcessWindow(DoFn):
    def process(self, element, window=DoFn.WindowParam):
        key, values = element
        avg_price = sum(values) / len(values)
        return [(key, avg_price)]

# Define the pipeline with custom calendar-based windowing
columns = ["Date", "Price", "Open", "High", "Low", "Vol.", "Change %"]

# Add options to the pipeline to allow unsafe triggers (in case you use any)
options = beam.options.pipeline_options.PipelineOptions(flags=['--allow_unsafe_triggers'])

with beam.Pipeline(options=options) as p:
    rows = (
        p | "ReadFromCSV" >> beam.io.ReadFromText('/content/drive/MyDrive/Data_Mining_CMPE_255/Nifty_Bank_Data.csv', skip_header_lines=1)
          | "ParseCSV" >> beam.Map(lambda line: dict(zip(columns, line.split(','))))
          | "ExtractPrice" >> beam.Map(lambda row: (month_start_key_str(row['Date']), float(row['Price'].replace(',', '').replace('"', '').strip())))
    )

    # Using GroupByKey to simulate monthly windows
    monthly_avg = (
        rows
        | "GroupByKey" >> beam.GroupByKey()
        | "ProcessWindow" >> ParDo(ProcessWindow())
        | "WriteResults" >> beam.io.WriteToText('/content/drive/MyDrive/Data_Mining_CMPE_255/apache_custom_windowed_results.txt')
    )


We used Apache Beam to look at Nifty Bank's data and find out the average price for each month. We faced some problems like wrong data formats, but we fixed them. Now, we have a setup that can handle a lot of data and can be used again for other projects. It's a good start for more detailed data studies in the future.