
Name: Harshith Akkapelli

Task 3: Apache Beam

Dataset Link: [Graduate Admission 2](https://www.kaggle.com/datasets/mohansacharya/graduate-admissions?select=Admission_Predict.csv)


In [None]:
"""Mount Google Drive to access your data"""
from google.colab import drive #Import library
drive.mount('/content/drive') #Mount it

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
"""IMPORT DATA AND HANDLE MISSING VALUES"""
import pandas as pd #Import Library

"""Function to load and remove na valued rows"""
def importData(url):
  print("Started Reading Data")
  getData = pd.read_csv(url)
  print("Completed Reading Data")

url = '/content/drive/MyDrive/1c.csv'
df = importData(url) #Call the Function

Started Reading Data
Completed Reading Data


In [None]:
!pip install apache-beam




In [None]:
import pandas as pd

# Read data with Pandas
df = pd.read_csv('/content/drive/MyDrive/dataset.csv')
records = df.to_dict('records')

import apache_beam as beam
from apache_beam.transforms.trigger import AfterCount

# 3. Define the Beam Pipeline
p = beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(allow_unsafe_triggers=True))

# Read data into the pipeline
data = p | "CreateFromIterable" >> beam.Create(records)

# 4. ParDo Transform to Extract GRE Scores
class ExtractGRE(beam.DoFn):
    def process(self, record):
        yield record["GRE Score"]

gre_scores = data | "ExtractGREScores" >> beam.ParDo(ExtractGRE())

# Intermediate Print Step
gre_scores | "PrintGREScores" >> beam.Map(lambda x: print(f"GRE Score: {x}"))

# 5. Windowing and Triggers
# Add a key to each element and apply windowing
windowed_scores = (
    gre_scores
    | "AddKey" >> beam.Map(lambda x: (1, x))
    | "WindowInto" >> beam.WindowInto(
        beam.window.FixedWindows(60),
        trigger=AfterCount(10),
        accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
    | "GroupByWindow" >> beam.GroupByKey()
)

# Intermediate Print Step for Windowed Scores
windowed_scores | "PrintWindowedScores" >> beam.Map(lambda x: print(f"Windowed Scores (Key, Scores): {x}"))

# 6. Composite Transform to Calculate the Mean GRE Score
class CalculateMean(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | "ExtractScores" >> beam.ParDo(ExtractGRE())
                | "CalculateMean" >> beam.CombineGlobally(beam.combiners.MeanCombineFn()))

mean_gre = data | "MeanGRE" >> CalculateMean()

# 7. Print the Mean GRE Score
mean_gre | "PrintMean" >> beam.Map(lambda x: print(f"Average GRE Score: {x}"))





<PCollection[[69]: PrintMean.None] at 0x7ee885d51630>

In [None]:
p.run().wait_until_finish()




GRE Score: 337
GRE Score: 324
GRE Score: 316
GRE Score: 322
GRE Score: 314
GRE Score: 330
GRE Score: 321
GRE Score: 308
GRE Score: 302
GRE Score: 323
GRE Score: 325
GRE Score: 327
GRE Score: 328
GRE Score: 307
GRE Score: 311
GRE Score: 314
GRE Score: 317
GRE Score: 319
GRE Score: 318
GRE Score: 303
GRE Score: 312
GRE Score: 325
GRE Score: 328
GRE Score: 334
GRE Score: 336
GRE Score: 340
GRE Score: 322
GRE Score: 298
GRE Score: 295
GRE Score: 310
GRE Score: 300
GRE Score: 327
GRE Score: 338
GRE Score: 340
GRE Score: 331
GRE Score: 320
GRE Score: 299
GRE Score: 300
GRE Score: 304
GRE Score: 307
GRE Score: 308
GRE Score: 316
GRE Score: 313
GRE Score: 332
GRE Score: 326
GRE Score: 322
GRE Score: 329
GRE Score: 339
GRE Score: 321
GRE Score: 327
GRE Score: 313
GRE Score: 312
GRE Score: 334
GRE Score: 324
GRE Score: 322
GRE Score: 320
GRE Score: 316
GRE Score: 298
GRE Score: 300
GRE Score: 311
GRE Score: 309
GRE Score: 307
GRE Score: 304
GRE Score: 315
GRE Score: 325
GRE Score: 325
GRE Score:

'DONE'