In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Initialize Glue Context
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Extract data from S3
datasource0 = glueContext.create_dynamic_frame.from_options(
    "s3",
    {"paths": ["s3://hexaware-data/raw/"]},
    format="json"
)

# Transform: Clean and preprocess
applymapping1 = ApplyMapping.apply(
    frame=datasource0,
    mappings=[
        ("student_id", "string", "student_id", "string"),
        ("assessment_score", "double", "score", "double"),
        ("timestamp", "string", "assessment_time", "timestamp")
    ]
)

# Load data into centralized storage
datasink2 = glueContext.write_dynamic_frame.from_options(
    frame=applymapping1,
    connection_type="s3",
    connection_options={"path": "s3://hexaware-data/processed/"},
    format="parquet"
)

job.commit()


ModuleNotFoundError: No module named 'awsglue'

**Generating Assessment Questions with OpenAI GPT-4 (Python)**

In [None]:
import openai
import os

# Configure OpenAI API Key
openai.api_key = os.getenv("OPENAI_API_KEY")

def generate_assessment_question(topic, difficulty):
    prompt = f"Generate a {difficulty} level question on the topic of {topic} suitable for a programming assessment."

    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are an expert in generating programming assessment questions."},
            {"role": "user", "content": prompt}
        ],
        max_tokens=150,
        n=1,
        stop=None,
        temperature=0.7,
    )

    question = response.choices[0].message['content'].strip()
    return question

# Example Usage
topic = "Data Structures"
difficulty = "Intermediate"
question = generate_assessment_question(topic, difficulty)
print("Generated Question:", question)


**AWS Lambda Function for Adaptive Assessment (Python)**

In [None]:
import json
import boto3
import openai
import os

# Initialize OpenAI API
openai.api_key = os.getenv("OPENAI_API_KEY")

def lambda_handler(event, context):
    # Parse input data
    body = json.loads(event['body'])
    learner_id = body['learner_id']
    previous_scores = body.get('previous_scores', [])

    # Determine next question difficulty
    if previous_scores:
        average_score = sum(previous_scores) / len(previous_scores)
        if average_score > 80:
            difficulty = "Advanced"
        elif average_score > 50:
            difficulty = "Intermediate"
        else:
            difficulty = "Beginner"
    else:
        difficulty = "Beginner"

    # Generate next question
    topic = body.get('topic', "General Programming")
    question = generate_assessment_question(topic, difficulty)

    # Return response
    return {
        'statusCode': 200,
        'body': json.dumps({
            'question': question,
            'difficulty': difficulty
        }),
        'headers': {
            'Content-Type': 'application/json',
        },
    }

def generate_assessment_question(topic, difficulty):
    prompt = f"Generate a {difficulty} level question on the topic of {topic} suitable for a programming assessment."

    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are an expert in generating programming assessment questions."},
            {"role": "user", "content": prompt}
        ],
        max_tokens=150,
        n=1,
        stop=None,
        temperature=0.7,
    )

    question = response.choices[0].message['content'].strip()
    return question


**Real-Time Feedback Generation with Apache Spark (Python)**


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import openai
import os

# Initialize Spark Session
spark = SparkSession.builder.appName("RealTimeFeedback").getOrCreate()

# Configure OpenAI API Key
openai.api_key = os.getenv("OPENAI_API_KEY")

# Define UDF to generate feedback
def generate_feedback(answer, correct_answer):
    prompt = f"Provide constructive feedback for the following answer:\n\nAnswer: {answer}\n\nCorrect Answer: {correct_answer}"

    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are an educational assistant providing feedback."},
            {"role": "user", "content": prompt}
        ],
        max_tokens=150,
        n=1,
        stop=None,
        temperature=0.7,
    )

    feedback = response.choices[0].message['content'].strip()
    return feedback

feedback_udf = udf(generate_feedback, StringType())

# Read streaming data from Kafka
df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-server:9092") \
    .option("subscribe", "learner-responses") \
    .load()

# Assume value is a JSON string with 'answer' and 'correct_answer'
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("answer", StringType(), True),
    StructField("correct_answer", StringType(), True)
])

responses = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

# Generate feedback
feedback = responses.withColumn("feedback", feedback_udf(col("answer"), col("correct_answer")))

# Write feedback back to Kafka
feedback.selectExpr("CAST(feedback AS STRING) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-server:9092") \
    .option("topic", "learner-feedback") \
    .option("checkpointLocation", "/tmp/spark-checkpoints") \
    .start() \
    .awaitTermination()


In [None]:
from aif360.datasets import BinaryLabelDataset
from aif360.metrics import ClassificationMetric
from aif360.algorithms.preprocessing import Reweighing
import pandas as pd

# Sample data
data = pd.read_csv("processed_assessment_data.csv")

# Define privileged and unprivileged groups
privileged_groups = [{'gender': 1}]
unprivileged_groups = [{'gender': 0}]

# Create BinaryLabelDataset
dataset = BinaryLabelDataset(
    df=data,
    label_names=['outcome'],
    protected_attribute_names=['gender'],
    privileged_groups=privileged_groups,
    unprivileged_groups=unprivileged_groups
)

# Apply Reweighing to mitigate bias
reweighing = Reweighing(unprivileged_groups=unprivileged_groups,
                        privileged_groups=privileged_groups)
dataset_transf = reweighing.fit_transform(dataset)

# Train a classifier (example with logistic regression)
from sklearn.linear_model import LogisticRegression
from aif360.algorithms.postprocessing import CalibratedEqOddsPostprocessing

X = dataset_transf.features
y = dataset_transf.labels.ravel()

model = LogisticRegression(solver='liblinear')
model.fit(X, y)

# Predict and evaluate
y_pred = model.predict(X)
dataset_pred = dataset_transf.copy(deepcopy=True)
dataset_pred.labels = y_pred

# Compute fairness metrics
metric = ClassificationMetric(dataset_transf, dataset_pred,
                              unprivileged_groups=unprivileged_groups,
                              privileged_groups=privileged_groups)
print("Disparate Impact:", metric.disparate_impact())
print("Equal Opportunity Difference:", metric.equal_opportunity_difference())
