In [None]:
!pip install apache-beam --quiet
!pip install apache-beam[gcp] --upgrade
!pip install apache-beam[gcp]

import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from datetime import datetime
import apache_beam as beam
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import pickle



In [None]:
class SplitWords(beam.DoFn):
    def process(self, element):
        return element.split()

with beam.Pipeline() as pipeline:
    words = (
        pipeline
        | 'Read from Input' >> beam.Create(['This is Apache Beam', 'Exploring Beam Features', 'Done by Aditya Rajpurohit'])
        | 'Split words' >> beam.ParDo(SplitWords())
        | 'Write to Output' >> beam.io.WriteToText('output.txt')
    )


class ProcessText(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Split text' >> beam.ParDo(SplitWords())
            | 'Filter short words' >> beam.Filter(lambda word: len(word) > 3)
        )

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | 'Create input' >> beam.Create(['Apache Beam Pipeline', 'Learning transforms', 'Aditya Rajpurohit'])
        | 'Process text' >> ProcessText()
        | 'Write to text' >> beam.io.WriteToText('composite_output.txt')
    )



In [None]:
class SplitWords(beam.DoFn):
    def process(self, element):
        text = element[0]
        return text.split()

with beam.Pipeline() as pipeline:
    windowed_data = (
        pipeline
        | 'Create stream data' >> beam.Create([("This is a test message for apache beam testing- Aditya", datetime.now()) for _ in range(10)])
        | 'Window into fixed intervals' >> beam.WindowInto(FixedWindows(10)) # 10-second windows
        | 'Process elements' >> beam.ParDo(SplitWords())
        | 'Write windowed output' >> beam.io.WriteToText('window_output.txt')
    )

In [None]:
iris = load_iris()

X = iris.data[:, :2]
y = iris.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

model = RandomForestClassifier()
model.fit(X_train, y_train)

model_path = 'iris_feature_model.pkl'
with open(model_path, 'wb') as model_file:
    pickle.dump(model, model_file)


In [None]:
class SklearnInference(beam.DoFn):
    def process(self, element):
        data = [[element['feature1'], element['feature2']]]
        prediction = model.predict(data)
        yield {'input': element, 'prediction': prediction[0]}

with beam.Pipeline() as pipeline:
    predictions = (
        pipeline
        | 'Create sample data' >> beam.Create([
            {'feature1': 0.5, 'feature2': 1.5},
            {'feature1': 0.3, 'feature2': 0.7}
        ])
        | 'Run manual model inference' >> beam.ParDo(SklearnInference())
        | 'Write predictions' >> beam.io.WriteToText('ml_predictions.txt')
    )


In [None]:
class SklearnInference(beam.DoFn):
    def process(self, element):
        # Assuming element has 4 features
        data = [[element['feature1'], element['feature2']]]
        prediction = model.predict(data)
        yield {'input': element, 'prediction': prediction[0]}

with beam.Pipeline() as pipeline:
    predictions = (
        pipeline
        | 'Create sample data' >> beam.Create([
            {'feature1': 0.5, 'feature2': 1.5, 'feature3': 2.5, 'feature4': 3.5},
            {'feature1': 0.3, 'feature2': 0.7, 'feature3': 1.2, 'feature4': 2.0}
        ])

        | 'Run manual model inference' >> beam.ParDo(SklearnInference())
        | 'Write predictions' >> beam.io.WriteToText('ml_predictions.txt')
    )