In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, substring,lit
import pyspark.sql.functions as F
import os

print ("Environment verification")
print("Currecnt directory:", os.getcwd())
print("Available files en data:")
for item in os.listdir ("/home/jovyan/data/sample"):
    print(f" - {item}")

print ("\n Archives in data/sample:")
for item in os.listdir("/home/jovyan/data/sample"):
    size= os.path.getsize(f"/home/jovyan/data/sample/{item}")
    print(f" - {item} ({size:,} bytes)")


Environment verification
Currecnt directory: /home/jovyan/work
Available files en data:
 - brfss_sample.asc

 Archives in data/sample:
 - brfss_sample.asc (2,054,000 bytes)


In [2]:
spark= SparkSession.builder \
.appName("HealthTransformations") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "1g") \
.getOrCreate()

print (f" Spark initialized; {spark.version}")
print (f" Spark UI available on: http://localhost:8080")

 Spark initialized; 3.5.0
 Spark UI available on: http://localhost:8080


In [13]:
print("loading data sample...")
df_brfss= spark.read.text("/home/jovyan/data/sample/brfss_sample.asc")
print(f"Data loaded: {df_brfss.count()} registers")

df_codes=df_brfss.select(
    substring(col("value"), 1,2).alias("state_code"),
    substring(col("value"), 143,2).alias("age_code"),
    substring(col("value"), 145,1).alias("gender_code"),
    substring(col("value"), 99,1).alias("diabetes_code"),
    substring(col("value"), 105,1).alias("exercise_code")
)
df_codes.cache()
print(f"Codes extrated successfully")
print("\n Codes sample:")
df_codes.show(5)
              
              

loading data sample...
Data loaded: 1000 registers
Codes extrated successfully

 Codes sample:
+----------+--------+-----------+-------------+-------------+
|state_code|age_code|gender_code|diabetes_code|exercise_code|
+----------+--------+-----------+-------------+-------------+
|        01|        |           |             |            8|
|        01|        |           |             |            8|
|        01|        |           |             |            3|
|        01|        |           |             |            8|
|        01|        |           |             |            8|
+----------+--------+-----------+-------------+-------------+
only showing top 5 rows



In [15]:
print("Archive diagnostics")
print("Lenght per line:")
df_brfss.select(F.length(col("value")).alias("line_lenght")).distinct().show()

print("\n Main 3 lines:")
df_brfss.select(col("value")).limit(3).collect()

print("\n Key positions characters:")
df_sample_positions= df_brfss.select(
    substring(col("value"),1,10).alias("first_10_characters"),
    substring(col("value"),95,10).alias("characters_95_104"),
    substring(col("value"),140,10).alias("characters_140_149"),
    substring(col("value"),100,10).alias("characters_100_109")
).limit(10)

df_sample_positions.show(truncate=False)

Archive diagnostics
Lenght per line:
+-----------+
|line_lenght|
+-----------+
|       2052|
+-----------+


 Main 3 lines:

 Key positions characters:
+-------------------+-----------------+------------------+------------------+
|first_10_characters|characters_95_104|characters_140_149|characters_100_109|
+-------------------+-----------------+------------------+------------------+
|01                 |      2888       |                  | 28888  99        |
|01                 |      1888       |                  | 18888  03        |
|01                 |      2020       |                  | 202030201        |
|01                 |      1888       |                  | 18888  99        |
|01                 |      4028       |                  | 402888807        |
|01                 |      5018       |                  | 501888803        |
|01                 |      2888       |                  | 28888  02        |
|01                 |      3888       |                  | 38888  03

In [16]:
print ("Main 200 characters")
sample_lines= df_brfss.select(col("value")).limit(3).collect()
for i, row in enumerate(sample_lines):
    line= row ['value']
    print (f"\nLine {i+1}:")
    print(f"0-50: {line[0:50]}")
    print(f"50-100: {line[50:100]}")
    print(f"100-150: {line[100:150]}")
    print(f"150-200: {line[150:200]}")

Main 200 characters

Line 1:
0-50: 01              0102032022     11002022000001     
50-100:             11 121 02 01012             2         
100-150: 28888  99121208  2222 222222180                   
150-200:                  161        112278899 999999992222

Line 2:
0-50: 01              0102042022     11002022000002     
50-100:             11 121 02 01012             2         
100-150: 18888  03228206  2222 1122223                     
150-200:                  341        121228805 015005032222

Line 3:
0-50: 01              0102022022     11002022000003     
50-100:             11 121 012                  2         
100-150: 202030201121105  2222 1222223                     
150-200:                  161        2 1278810 014005022222


In [18]:
print("Exploring patterns based on positions")
df_explore = df_brfss.select(
    substring(col("value"), 1, 2).alias("state_code"),   
    
    
    substring(col("value"), 101, 1).alias("first_digit_101"),  
    substring(col("value"), 102, 1).alias("second_digit_102"),   
    substring(col("value"), 103, 1).alias("third_digit_103"),    
    substring(col("value"), 104, 1).alias("fourth_digit_104"),   
    substring(col("value"), 105, 1).alias("fifth_digit_105"),    
    
    
    substring(col("value"), 108, 2).alias("chars_108_109"),    
    
    
    substring(col("value"), 65, 6).alias("chars_65_70"),       
    substring(col("value"), 72, 6).alias("chars_72_77"),      
    
    
    substring(col("value"), 15, 4).alias("chars_15_18"),        
    substring(col("value"), 80, 5).alias("chars_80_84"),        
).limit(10)

df_explore.show(truncate=False)

Exploring patterns based on positions
+----------+---------------+----------------+---------------+----------------+---------------+-------------+-----------+-----------+-----------+-----------+
|state_code|first_digit_101|second_digit_102|third_digit_103|fourth_digit_104|fifth_digit_105|chars_108_109|chars_65_70|chars_72_77|chars_15_18|chars_80_84|
+----------+---------------+----------------+---------------+----------------+---------------+-------------+-----------+-----------+-----------+-----------+
|01        |2              |8               |8              |8               |8              |99           | 121 0     | 01012     |  01       |           |
|01        |1              |8               |8              |8               |8              |03           | 121 0     | 01012     |  01       |           |
|01        |2              |0               |2              |0               |3              |01           | 121 0     |2          |  01       |           |
|01        |1       

In [19]:
print("Extracting codes with correct positions")

df_codes_corrected = df_brfss.select(
    substring(col("value"), 1, 2).alias("state_code"),     
    substring(col("value"), 101, 1).alias("age_code"),      
    substring(col("value"), 102, 1).alias("gender_code"),    
    substring(col("value"), 108, 1).alias("diabetes_code"), 
    substring(col("value"), 109, 1).alias("exercise_code")  
)

df_codes_corrected.cache()

print("Codes extracted with corrected positions")
print("\n Codes:")
df_codes_corrected.show(10)

print("\nCode distributions per field:")
print("Unique states:")
df_codes_corrected.select("state_code").distinct().show()

print("Unique code per age:")
df_codes_corrected.select("age_code").distinct().orderBy("age_code").show()

print("Unique code per region:")
df_codes_corrected.select("gender_code").distinct().orderBy("gender_code").show()

print("Unique diabetes codes:")
df_codes_corrected.select("diabetes_code").distinct().orderBy("diabetes_code").show()

print("Unique exercise codes")
df_codes_corrected.select("exercise_code").distinct().orderBy("exercise_code").show()

Extracting codes with correct positions
Codes extracted with corrected positions

 Codes:
+----------+--------+-----------+-------------+-------------+
|state_code|age_code|gender_code|diabetes_code|exercise_code|
+----------+--------+-----------+-------------+-------------+
|        01|       2|          8|            9|            9|
|        01|       1|          8|            0|            3|
|        01|       2|          0|            0|            1|
|        01|       1|          8|            9|            9|
|        01|       4|          0|            0|            7|
|        01|       5|          0|            0|            3|
|        01|       2|          8|            0|            2|
|        01|       3|          8|            0|            3|
|        01|       3|          8|            0|            3|
|        01|       3|          0|            0|            3|
+----------+--------+-----------+-------------+-------------+
only showing top 10 rows


Code distributi

In [20]:
df_step1 = df_codes_corrected.withColumn("state_name",
    when(col("state_code") == "01", "Alabama")
    .when(col("state_code") == "02", "Alaska")
    .when(col("state_code") == "04", "Arizona")
    .when(col("state_code") == "05", "Arkansas")
    .when(col("state_code") == "06", "California")
    .when(col("state_code") == "08", "Colorado")
    .when(col("state_code") == "09", "Connecticut")
    .when(col("state_code") == "10", "Delaware")
    .when(col("state_code") == "11", "District of Columbia")
    .when(col("state_code") == "12", "Florida")
    .when(col("state_code") == "13", "Georgia")
    .when(col("state_code") == "15", "Hawaii")
    .when(col("state_code") == "16", "Idaho")
    .when(col("state_code") == "17", "Illinois")
    .when(col("state_code") == "18", "Indiana")
    .when(col("state_code") == "19", "Iowa")
    .when(col("state_code") == "20", "Kansas")
    .when(col("state_code") == "21", "Kentucky")
    .when(col("state_code") == "22", "Louisiana")
    .when(col("state_code") == "23", "Maine")
    .when(col("state_code") == "24", "Maryland")
    .when(col("state_code") == "25", "Massachusetts")
    .when(col("state_code") == "26", "Michigan")
    .when(col("state_code") == "27", "Minnesota")
    .when(col("state_code") == "28", "Mississippi")
    .when(col("state_code") == "29", "Missouri")
    .when(col("state_code") == "30", "Montana")
    .when(col("state_code") == "31", "Nebraska")
    .when(col("state_code") == "32", "Nevada")
    .when(col("state_code") == "33", "New Hampshire")
    .when(col("state_code") == "34", "New Jersey")
    .when(col("state_code") == "35", "New Mexico")
    .when(col("state_code") == "36", "New York")
    .when(col("state_code") == "37", "North Carolina")
    .when(col("state_code") == "38", "North Dakota")
    .when(col("state_code") == "39", "Ohio")
    .when(col("state_code") == "40", "Oklahoma")
    .when(col("state_code") == "41", "Oregon")
    .when(col("state_code") == "42", "Pennsylvania")
    .when(col("state_code") == "44", "Rhode Island")
    .when(col("state_code") == "45", "South Carolina")
    .when(col("state_code") == "46", "South Dakota")
    .when(col("state_code") == "47", "Tennessee")
    .when(col("state_code") == "48", "Texas")
    .when(col("state_code") == "49", "Utah")
    .when(col("state_code") == "50", "Vermont")
    .when(col("state_code") == "51", "Virginia")
    .when(col("state_code") == "53", "Washington")
    .when(col("state_code") == "54", "West Virginia")
    .when(col("state_code") == "55", "Wisconsin")
    .when(col("state_code") == "56", "Wyoming")
    # Territorios adicionales
    .when(col("state_code") == "66", "Guam")
    .when(col("state_code") == "72", "Puerto Rico")
    .when(col("state_code") == "78", "US Virgin Islands")
    .otherwise("Estado/Territorio Desconocido")
)


df_step1.cache()
print("State data transormed")
print("States that are available in data:")
df_step1.select("state_code", "state_name").distinct().orderBy("state_name").show()

print(f"Total de registros: {df_step1.count()}")

State data transormed
States that are available in data:
+----------+----------+
|state_code|state_name|
+----------+----------+
|        01|   Alabama|
+----------+----------+

Total de registros: 1000


In [21]:
df_step2 = df_step1.withColumn("age_group",
    when(col("age_code") == "1", "18-24 yrs")
    .when(col("age_code") == "2", "25-34 yrs") 
    .when(col("age_code") == "3", "35-44 yrs")
    .when(col("age_code") == "4", "45-54 yrs")
    .when(col("age_code") == "5", "55-64 yrs")
    .when(col("age_code") == "6", "65-74 yrs")
    .when(col("age_code") == "7", "75+ yrs")
    .when(col("age_code") == "9", "Not specified")
    .otherwise("Unknown Age")
)

df_step2.cache()
print("Age Groups")
print("Distribution per age:")
df_step2.groupBy("age_group").count().orderBy(F.desc("count")).show()

Age Groups
Distribution per age:
+-------------+-----+
|    age_group|count|
+-------------+-----+
|    35-44 yrs|  376|
|    45-54 yrs|  230|
|    25-34 yrs|  229|
|    55-64 yrs|   83|
|    18-24 yrs|   80|
|      75+ yrs|    1|
|Not specified|    1|
+-------------+-----+



In [22]:
df_step3 = df_step2.withColumn("gender",
    when(col("gender_code") == "1", "Male")
    .when(col("gender_code") == "2", "Female")
    .when(col("gender_code") == "0", "Not reported")
    .when(col("gender_code") == "3", "Other")
    .when(col("gender_code") == "7", "Does not know")
    .when(col("gender_code") == "8", "Not specified")
    .when(col("gender_code") == "9", "Refuse to answer")
    .otherwise("Not clasified")
)

df_step3.cache()
print("Genre transformed")
print("Distribution per genre:")
df_step3.groupBy("gender").count().orderBy(F.desc("count")).show()

Genre transformed
Distribution per genre:
+----------------+-----+
|          gender|count|
+----------------+-----+
|   Not specified|  556|
|    Not reported|  191|
|           Other|  117|
|            Male|   74|
|   Does not know|   36|
|          Female|   20|
|Refuse to answer|    6|
+----------------+-----+



In [23]:
df_step4 = df_step3.withColumn("diabetes_status",
    when(col("diabetes_code") == "1", "Yes")
    .when(col("diabetes_code") == "2", "Sí, but only during pregnancy")
    .when(col("diabetes_code") == "3", "No, prediabetes or borderline")
    .when(col("diabetes_code") == "4", "Not diabetic")
    .when(col("diabetes_code") == "0", "Not reported")
    .when(col("diabetes_code") == "7", "Not sure")
    .when(col("diabetes_code") == "8", "Not specified")
    .when(col("diabetes_code") == "9", "Refused to answer")
    .otherwise("Estado desconocido")
)

df_step4.cache()
print("Diabetes values transformed")
print("Diabetes distribution:")
df_step4.groupBy("diabetes_status").count().orderBy(F.desc("count")).show()

Diabetes values transformed
Diabetes distribution:
+-----------------+-----+
|  diabetes_status|count|
+-----------------+-----+
|     Not reported|  940|
|Refused to answer|   23|
|              Yes|   18|
|    Not specified|   15|
|         Not sure|    4|
+-----------------+-----+



In [24]:
df_final = df_step4.withColumn("exercise_status",
    when(col("exercise_code") == "1", "Yes, exercises")
    .when(col("exercise_code") == "2", "Does not exercise")
    .when(col("exercise_code") == "0", "Not reported")
    .when(col("exercise_code") == "3", "Limited activity")
    .when(col("exercise_code") == "4", "Only light exercise")
    .when(col("exercise_code") == "5", "Moderate exercise")
    .when(col("exercise_code") == "7", "Doesn't know/Not sure")
    .when(col("exercise_code") == "8", "Not specified")
    .when(col("exercise_code") == "9", "Refused to answer")
    .otherwise("Unknown status")
)

df_final.cache()
print("Exercise column transformed")
print("Exercise distribution:")
df_final.groupBy("exercise_status").count().orderBy(F.desc("count")).show()

Exercise column transformed
Exercise distribution:
+--------------------+-----+
|     exercise_status|count|
+--------------------+-----+
|    Limited activity|  654|
|      Yes, exercises|  139|
|   Does not exercise|   72|
|Doesn't know/Not ...|   37|
|   Moderate exercise|   31|
|   Refused to answer|   30|
|        Not reported|   18|
|       Not specified|   15|
| Only light exercise|    4|
+--------------------+-----+



In [25]:
print(f"Total records processed: {df_final.count()}")

print("\n Sample of transformed data")
df_sample_readable = df_final.select(
    "state_name", 
    "age_group", 
    "gender", 
    "diabetes_status", 
    "exercise_status"
).limit(15)

df_sample_readable.show(truncate=False)

Total records processed: 1000

 Sample of transformed data
+----------+---------+-------------+-----------------+---------------------+
|state_name|age_group|gender       |diabetes_status  |exercise_status      |
+----------+---------+-------------+-----------------+---------------------+
|Alabama   |25-34 yrs|Not specified|Refused to answer|Refused to answer    |
|Alabama   |18-24 yrs|Not specified|Not reported     |Limited activity     |
|Alabama   |25-34 yrs|Not reported |Not reported     |Yes, exercises       |
|Alabama   |18-24 yrs|Not specified|Refused to answer|Refused to answer    |
|Alabama   |45-54 yrs|Not reported |Not reported     |Doesn't know/Not sure|
|Alabama   |55-64 yrs|Not reported |Not reported     |Limited activity     |
|Alabama   |25-34 yrs|Not specified|Not reported     |Does not exercise    |
|Alabama   |35-44 yrs|Not specified|Not reported     |Limited activity     |
|Alabama   |35-44 yrs|Not specified|Not reported     |Limited activity     |
|Alabama   |35-44

In [28]:
df_descriptions = df_final.select(
    F.concat_ws(" of ", 
        col("gender"),
        col("age_group")
    ).alias("person_description"),
    F.concat_ws(" who ", 
        col("state_name"),
        col("diabetes_status"),
        F.concat_ws(" and ", lit(""), col("exercise_status"))
    ).alias("health_description")
).limit(10)

df_descriptions.show(truncate=False)

+--------------------------+--------------------------------------------------------+
|person_description        |health_description                                      |
+--------------------------+--------------------------------------------------------+
|Not specified of 25-34 yrs|Alabama who Refused to answer who  and Refused to answer|
|Not specified of 18-24 yrs|Alabama who Not reported who  and Limited activity      |
|Not reported of 25-34 yrs |Alabama who Not reported who  and Yes, exercises        |
|Not specified of 18-24 yrs|Alabama who Refused to answer who  and Refused to answer|
|Not reported of 45-54 yrs |Alabama who Not reported who  and Doesn't know/Not sure |
|Not reported of 55-64 yrs |Alabama who Not reported who  and Limited activity      |
|Not specified of 25-34 yrs|Alabama who Not reported who  and Does not exercise     |
|Not specified of 35-44 yrs|Alabama who Not reported who  and Limited activity      |
|Not specified of 35-44 yrs|Alabama who Not reported w

In [29]:
df_final.createOrReplaceTempView("health_data_clean")

print("1. Diabetes by Age Group:")
spark.sql("""
    SELECT age_group, diabetes_status, COUNT(*) as count
    FROM health_data_clean 
    WHERE diabetes_status != 'Not reported'
    GROUP BY age_group, diabetes_status
    ORDER BY age_group, count DESC
""").show()

print("2. Exercise by Gender:")
spark.sql("""
    SELECT gender, exercise_status, COUNT(*) as count
    FROM health_data_clean 
    WHERE gender NOT IN ('Not specified', 'Not reported') 
    AND exercise_status != 'Not reported'
    GROUP BY gender, exercise_status
    ORDER BY gender, count DESC
""").show()

print("DataFrame 'df_final' ready for advanced analysis")
print("SQL view 'health_data_clean' created")

1. Diabetes by Age Group:
+---------+-----------------+-----+
|age_group|  diabetes_status|count|
+---------+-----------------+-----+
|18-24 yrs|              Yes|    1|
|18-24 yrs|Refused to answer|    1|
|25-34 yrs|Refused to answer|    5|
|25-34 yrs|    Not specified|    3|
|25-34 yrs|              Yes|    3|
|35-44 yrs|Refused to answer|   11|
|35-44 yrs|              Yes|    6|
|35-44 yrs|    Not specified|    3|
|35-44 yrs|         Not sure|    2|
|45-54 yrs|    Not specified|    8|
|45-54 yrs|              Yes|    5|
|45-54 yrs|Refused to answer|    4|
|45-54 yrs|         Not sure|    2|
|55-64 yrs|              Yes|    3|
|55-64 yrs|Refused to answer|    2|
|55-64 yrs|    Not specified|    1|
+---------+-----------------+-----+

2. Exercise by Gender:
+-------------+--------------------+-----+
|       gender|     exercise_status|count|
+-------------+--------------------+-----+
|Does not know|    Limited activity|   25|
|Does not know|      Yes, exercises|    6|
|Does not know|