In [1]:
import boto3

# Initialize the DynamoDB client (local instance)
dynamodb = boto3.resource(
    'dynamodb',
    endpoint_url='http://dynamodb-local:8000',  # Local DynamoDB URL
    region_name='us-west-2',               # AWS region (optional for local)
    aws_access_key_id='dummy',             # Dummy credentials for local use
    aws_secret_access_key='dummy'          # Dummy credentials for local use
)

DB_TABLE_NAME = "processed"

In [2]:
# Create DB Table
# Define table parameters
table_name = DB_TABLE_NAME
attribute_definitions = [
    {
        'AttributeName': 'pk',
        'AttributeType': 'S'  # S for String, N for Number, B for Binary
    }
]
key_schema = [
    {
        'AttributeName': 'pk',
        'KeyType': 'HASH'  # Partition key
    }
]
provisioned_throughput = {
    'ReadCapacityUnits': 1,
    'WriteCapacityUnits': 1
}

# Create the table
try:
    table = dynamodb.create_table(
        TableName=table_name,
        AttributeDefinitions=attribute_definitions,
        KeySchema=key_schema,
        ProvisionedThroughput=provisioned_throughput
    )
    # Wait until the table is created
    table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
    print(f"Table '{table_name}' created successfully.")
except Exception as e:
    print(f"Error creating table: {e}")



Table 'processed' created successfully.


In [3]:
# Read data
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, col, when, count, isnan
from pyspark.ml.feature import StringIndexer, VectorAssembler


spark = SparkSession.builder.appName("HealthcareDataAnalysis").getOrCreate()

columns = [
    "age", "sex", "cp", "trestbps", "chol", "fbs", "restecg",
    "thalach", "exang", "oldpeak", "slope", "ca", "thal", "target"
]

data = spark.read.csv("hdfs://namenode:9000/input/heart_decease.csv", header=False, inferSchema=True)

data = data.toDF(*columns)


data.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/25 15:18:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+------+
| age|sex| cp|trestbps| chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+------+
|63.0|1.0|1.0|   145.0|233.0|1.0|    2.0|  150.0|  0.0|    2.3|  3.0|0.0| 6.0|     0|
|67.0|1.0|4.0|   160.0|286.0|0.0|    2.0|  108.0|  1.0|    1.5|  2.0|3.0| 3.0|     2|
|67.0|1.0|4.0|   120.0|229.0|0.0|    2.0|  129.0|  1.0|    2.6|  2.0|2.0| 7.0|     1|
|37.0|1.0|3.0|   130.0|250.0|0.0|    0.0|  187.0|  0.0|    3.5|  3.0|0.0| 3.0|     0|
|41.0|0.0|2.0|   130.0|204.0|0.0|    2.0|  172.0|  0.0|    1.4|  1.0|0.0| 3.0|     0|
|56.0|1.0|2.0|   120.0|236.0|0.0|    0.0|  178.0|  0.0|    0.8|  1.0|0.0| 3.0|     0|
|62.0|0.0|4.0|   140.0|268.0|0.0|    2.0|  160.0|  0.0|    3.6|  3.0|2.0| 3.0|     3|
|57.0|0.0|4.0|   120.0|354.0|0.0|    0.0|  163.0|  1.0|    0.6|  1.0|0.0| 3.0|     0|
|63.0|1.0|4.0|   130.0|254.0|0.0|    2.0|  147.0|  0.0

## Column Headers and Their Descriptions

**age**: Age of the patient in years.

**sex**: Sex of the patient.
- `1` = Male
- `0` = Female

**cp**: Chest pain type (4 types).
- `1` = Typical angina: chest pain related to decreased blood supply to the heart.
- `2` = Atypical angina: chest pain not related to the heart.
- `3` = Non-anginal pain: typically esophageal or another form of chest pain.
- `4` = Asymptomatic: no chest pain.

**trestbps**: Resting blood pressure (in mm Hg on admission to the hospital).
- This is the blood pressure measured while at rest.

**chol**: Serum cholesterol in mg/dl.
- Represents the cholesterol level of the patient.

**fbs**: Fasting blood sugar > 120 mg/dl.
- `1` = True (fasting blood sugar is higher than 120 mg/dl)
- `0` = False (fasting blood sugar is lower than or equal to 120 mg/dl)

**restecg**: Resting electrocardiographic results (values 0, 1, 2).
- `0` = Normal.
- `1` = Having ST-T wave abnormality (T wave inversions and/or ST elevation or depression of > 0.05 mV).
- `2` = Showing probable or definite left ventricular hypertrophy by Estes' criteria.

**thalach**: Maximum heart rate achieved.
- The highest heart rate reached during a stress test.

**exang**: Exercise-induced angina.
- `1` = Yes (angina induced by exercise)
- `0` = No (no angina induced by exercise)

**oldpeak**: ST depression induced by exercise relative to rest.
- Represents the difference between the heart's state during rest and during exercise.

**slope**: The slope of the peak exercise ST segment.
- `1` = Upsloping: better heart rate recovery.
- `2` = Flat: minimal or no change in the ST segment.
- `3` = Downsloping: indicative of heart problems.

**ca**: Number of major vessels (0-3) colored by fluoroscopy.
- Represents the number of major blood vessels that are visible after injecting a contrast dye.
- The values range from `0` to `3`.

**thal**: Thalassemia (a blood disorder that affects hemoglobin levels).
- `3` = Normal.
- `6` = Fixed defect: no proper blood movement in part of the heart.
- `7` = Reversible defect: blood flow is impaired, but not permanently.

**target**: Diagnosis of heart disease (angiographic disease status).
- `0` = No presence of heart disease.
- `1` = Presence of heart disease.


In [5]:
# Persist data
table = dynamodb.Table(DB_TABLE_NAME)

# Put an item into the table
table.put_item(
    Item={
        'pk': '123',
        'Name': 'Example Item'
    }
)

{'ResponseMetadata': {'RequestId': 'ebcb417d-c833-440c-9e70-d903768bf3b7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Jetty(12.0.8)',
   'date': 'Sat, 26 Oct 2024 10:05:47 GMT',
   'x-amzn-requestid': 'ebcb417d-c833-440c-9e70-d903768bf3b7',
   'content-type': 'application/x-amz-json-1.0',
   'x-amz-crc32': '2745614147',
   'content-length': '2'},
  'RetryAttempts': 0}}

In [10]:
from decimal import Decimal, ROUND_HALF_UP, InvalidOperation

# Helper function to safely convert to Decimal
def safe_decimal(value):
    # Check if the value is None or non-numeric
    if value is None:
        return None
    try:
        return Decimal(value).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
    except (InvalidOperation, TypeError):
        return None  # Return None or another placeholder if conversion fails

# Iterate through each row in the DataFrame and store it in DynamoDB
for row in data.collect():
    table.put_item(
        Item={
            'pk': f"patient_{int(row['age'])}_{int(row['sex'])}",  # Unique ID combining age and sex
            'age': safe_decimal(row['age']),
            'sex': 'Male' if row['sex'] == 1 else 'Female',
            'cp': safe_decimal(row['cp']),
            'trestbps': safe_decimal(row['trestbps']),
            'chol': safe_decimal(row['chol']),
            'fbs': safe_decimal(row['fbs']),
            'restecg': safe_decimal(row['restecg']),
            'thalach': safe_decimal(row['thalach']),
            'exang': safe_decimal(row['exang']),
            'oldpeak': safe_decimal(row['oldpeak']),
            'slope': safe_decimal(row['slope']),
            'ca': safe_decimal(row['ca']),
            'thal': safe_decimal(row['thal']),
            'target': safe_decimal(row['target'])
        }
    )
print("Data stored in DynamoDB successfully.")

Data stored in DynamoDB successfully.


In [11]:
import boto3
from boto3.dynamodb.conditions import Key

# Initialize the DynamoDB client (assuming local instance)
dynamodb = boto3.resource(
    'dynamodb',
    endpoint_url='http://dynamodb-local:8000',  # Local DynamoDB URL
    region_name='us-west-2',
    aws_access_key_id='dummy',
    aws_secret_access_key='dummy'
)

# Specify the table name
table = dynamodb.Table('processed')

# Retrieve a few items from DynamoDB
try:
    # Scan the table to retrieve all items (useful for validation)
    response = table.scan(Limit=5)  # Retrieve 5 items to check for data accuracy
    
    # Print the retrieved items
    items = response.get('Items', [])
    print("Retrieved items from DynamoDB:")
    for item in items:
        print(item)
except Exception as e:
    print(f"Error retrieving items: {e}")

Retrieved items from DynamoDB:
{'exang': Decimal('1'), 'sex': 'Male', 'thal': Decimal('6'), 'chol': Decimal('169'), 'slope': Decimal('3'), 'cp': Decimal('4'), 'trestbps': Decimal('120'), 'target': Decimal('2'), 'oldpeak': Decimal('2.8'), 'thalach': Decimal('144'), 'fbs': Decimal('0'), 'pk': 'patient_44_1', 'age': Decimal('44'), 'ca': Decimal('0'), 'restecg': Decimal('0')}
{'exang': Decimal('1'), 'sex': 'Female', 'thal': Decimal('3'), 'chol': Decimal('243'), 'slope': Decimal('2'), 'cp': Decimal('4'), 'trestbps': Decimal('138'), 'target': Decimal('0'), 'oldpeak': Decimal('0'), 'thalach': Decimal('152'), 'fbs': Decimal('0'), 'pk': 'patient_46_0', 'age': Decimal('46'), 'ca': Decimal('0'), 'restecg': Decimal('2')}
{'exang': Decimal('0'), 'sex': 'Female', 'thal': Decimal('3'), 'chol': Decimal('306'), 'slope': Decimal('1'), 'cp': Decimal('2'), 'trestbps': Decimal('126'), 'target': Decimal('0'), 'oldpeak': Decimal('0'), 'thalach': Decimal('163'), 'fbs': Decimal('0'), 'pk': 'patient_41_0', 'age