# Ingest population using water safelly overall

In [0]:
display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/mnt/waterprojectdl/bronze,abfss://bronze@waterprojectdl.dfs.core.windows.net/,
/Volumes,UnityCatalogVolumes,
/mnt/waterprojectdl-raw,abfss://raw@waterprojectdl.dfs.core.windows.net/,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/mnt/waterprojectdl/raw,abfss://raw@waterprojectdl.dfs.core.windows.net/,
/databricks/mlflow-registry,databricks/mlflow-registry,
/Volume,DbfsReserved,
/volumes,DbfsReserved,


In [0]:
%fs 
ls /mnt/waterprojectdl/raw

In [0]:
overall = spark.read.csv("dbfs:/mnt/waterprojectdl/raw/population_using_water_safely_overall.csv", header=True)

In [0]:
from process import remove_specific_rows
overall_num_cleaned = remove_specific_rows(overall, 24, 59)

In [0]:
from pyspark.sql.functions import col

columns = overall_num_cleaned.columns

null_columns = [column for column in columns if overall_num_cleaned.filter(col(column).isNotNull()).count() == 0]

null_columns

Out[4]: ['SDG',
 'SDG target',
 'Indicator Code',
 'Geographical area code',
 'Time detail',
 'Footnote',
 'Type of data',
 'Units',
 'Age group',
 'Bounds',
 'Frequency',
 'Level/Status',
 'Type of reporting',
 'Sex']

In [0]:
overall_cleaned_col = overall_num_cleaned.drop(*null_columns, "SDG indicator", "SDG 6 Data portal level")
display(overall_cleaned_col)

Indicator name,Geographical area name,Year,Value,Source,Location
"Drinking water,Safely managed service",Morocco,2000,41.516566077849,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2001,41.955470588583,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2002,42.393078989123,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2003,42.828273466588,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2004,43.261617365472,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2005,43.805396222483,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2006,44.369282773552,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2007,44.929388561225,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2008,47.739590836905,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2009,50.602361224205,"WHO, UNICEF",National


In [0]:
overall_cleaned_col = overall_cleaned_col.withColumn("Value", col("Value").cast("double")).withColumn("Year", col("Year").cast("integer"))

In [0]:
overall_cleaned_col.printSchema()

root
 |-- Indicator name: string (nullable = true)
 |-- Geographical area name: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Value: double (nullable = true)
 |-- Source: string (nullable = true)
 |-- Location: string (nullable = true)



In [0]:
display(overall_cleaned_col.select("Year","Value").orderBy("Year"))

Year,Value
2000,41.516566077849
2001,41.955470588583
2002,42.393078989123
2003,42.828273466588
2004,43.261617365472
2005,43.805396222483
2006,44.369282773552
2007,44.929388561225
2008,47.739590836905
2009,50.602361224205


Databricks visualization. Run in Databricks to view.

## Write the dataframe in a parquet file to processed folder


In [0]:
from wtr_utils import mount_dbfs
processed_mount = mount_dbfs("waterprojectdl", "processed")

Attempting to mount processed to /mnt/waterprojectdl/processed
Mounting processed to /mnt/waterprojectdl/processed
Successfully mounted processed to /mnt/waterprojectdl/processed


In [0]:
overall_cleaned_col.write.mode("overwrite").parquet("dbfs:/mnt/waterprojectdl/processed/overall")

In [0]:
files = dbutils.fs.ls("mnt/waterprojectdl/processed/overall")
for file in files:
    print(file.path)

dbfs:/mnt/waterprojectdl/processed/overall/_SUCCESS
dbfs:/mnt/waterprojectdl/processed/overall/_committed_1488123185496759241
dbfs:/mnt/waterprojectdl/processed/overall/_started_1488123185496759241
dbfs:/mnt/waterprojectdl/processed/overall/part-00000-tid-1488123185496759241-f9c42eb7-268e-4dfe-bf7c-3c6d93d2085b-54-1-c000.snappy.parquet


In [0]:
output_path = "dbfs:/mnt/waterprojectdl/processed/overall"
files = dbutils.fs.ls(output_path)
if files:
    print(f"Files written to {output_path}:")
    for file in files:
        print(file.path)
else:
    print(f"No files found in {output_path}")

Files written to dbfs:/mnt/waterprojectdl/processed/overall:
dbfs:/mnt/waterprojectdl/processed/overall/_SUCCESS
dbfs:/mnt/waterprojectdl/processed/overall/_committed_1488123185496759241
dbfs:/mnt/waterprojectdl/processed/overall/_started_1488123185496759241
dbfs:/mnt/waterprojectdl/processed/overall/part-00000-tid-1488123185496759241-f9c42eb7-268e-4dfe-bf7c-3c6d93d2085b-54-1-c000.snappy.parquet


In [0]:
dbutils.fs.mounts()

Out[13]: [MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/mnt/waterprojectdl/bronze', source='abfss://bronze@waterprojectdl.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/Volumes', source='UnityCatalogVolumes', encryptionType=''),
 MountInfo(mountPoint='/mnt/waterprojectdl-raw', source='abfss://raw@waterprojectdl.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/mnt/waterprojectdl/raw', source='abfss://raw@waterprojectdl.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/mnt/waterprojectdl/processed', source='abfss://processed@waterprojectdl.dfs.core.wi

In [0]:
df = spark.read.parquet("/mnt/waterprojectdl/processed/overall")
display(df)

Indicator name,Geographical area name,Year,Value,Source,Location
"Drinking water,Safely managed service",Morocco,2000,41.516566077849,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2001,41.955470588583,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2002,42.393078989123,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2003,42.828273466588,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2004,43.261617365472,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2005,43.805396222483,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2006,44.369282773552,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2007,44.929388561225,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2008,47.739590836905,"WHO, UNICEF",National
"Drinking water,Safely managed service",Morocco,2009,50.602361224205,"WHO, UNICEF",National
