# Data Pipeline

#### Data ingestion

In [1]:
import pandas as pd

In [2]:
df = pd.read_csv('UNdata_literacy rate.csv', delimiter=',')

#### Eksplor Data 

In [3]:
df.head(15)

Unnamed: 0,Reference Area,Time Period,Sex,Age group,Units of measurement,Observation Value
0,Afghanistan,1979,Female,15-24 year olds,Percent,11.1428
1,Afghanistan,2011,Female,15-24 year olds,Percent,32.113223
2,Afghanistan,2015,Female,15-24 year olds,Percent,46.10599
3,Afghanistan,1979,Male,15-24 year olds,Percent,45.79602
4,Afghanistan,2011,Male,15-24 year olds,Percent,61.879069
5,Afghanistan,2015,Male,15-24 year olds,Percent,69.42052
6,Afghanistan,1979,All genders,15-24 year olds,Percent,30.06635
7,Afghanistan,2011,All genders,15-24 year olds,Percent,46.990051
8,Afghanistan,2015,All genders,15-24 year olds,Percent,58.1549
9,Albania,2001,All genders,15-24 year olds,Percent,99.43515


In [4]:
df.tail()

Unnamed: 0,Reference Area,Time Period,Sex,Age group,Units of measurement,Observation Value
4005,Zimbabwe,1982,Male,15-24 year olds,Percent,93.82696
4006,Zimbabwe,1992,Male,15-24 year olds,Percent,96.54062
4007,Zimbabwe,2011,Male,15-24 year olds,Percent,89.59058
4008,Zimbabwe,2015,Male,15-24 year olds,Percent,89.96682
4009,Zimbabwe,2015,Female,15-24 year olds,Percent,93.50235


In [5]:
print(df.info())                                         # menampilkan type data dan jumlah Non null

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4010 entries, 0 to 4009
Data columns (total 6 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   Reference Area        4010 non-null   object 
 1   Time Period           4010 non-null   int64  
 2   Sex                   4010 non-null   object 
 3   Age group             4010 non-null   object 
 4   Units of measurement  4010 non-null   object 
 5   Observation Value     4010 non-null   float64
dtypes: float64(1), int64(1), object(4)
memory usage: 188.1+ KB
None


In [6]:
#menampilkan statistik deskriptif dengan format yang lebih terstuktur dan mudah dibaca(T=menukar baris menjadi kolom,dan kolom menjadi baris)
print("Summary Statistics:")
df.describe().T

Summary Statistics:


Unnamed: 0,count,mean,std,min,25%,50%,75%,max
Time Period,4010.0,2002.961596,10.318646,1975.0,1996.0,2005.0,2012.0,2015.0
Observation Value,4010.0,86.778156,16.830429,6.66406,78.474035,94.57494,98.883607,100.0


In [7]:
# cek missing values
print("\nMissing Values:")
print(df.isnull().sum())


Missing Values:
Reference Area          0
Time Period             0
Sex                     0
Age group               0
Units of measurement    0
Observation Value       0
dtype: int64


Karna tidak terdapat nilai yang kosong, data duplikasi, atau data yang tidak relevan. maka, tidak di lakukan cleanig data. jadi, bisa di lakukan analisis data yang relevan untuk analisis yang lebih akurat.

#### Validasi Data

In [8]:
!pip install -q "great-expectations==0.18.19"

In [9]:
# Membuat data context 
from great_expectations.data_context import FileDataContext

context = FileDataContext.create(project_root_dir='./')

In [10]:
# Give a name to a Datasource. This name must be unique between Datasources.
datasource_name = 'csv literacy rate data'
datasource = context.sources.add_pandas('UNdata_literacy rate.csv')

# Give a name to a data asset
asset_name = 'literacy rate'
path_to_data = r'C:\Users\auliya rizki\Hactiv8\project\UNdata_literacy rate.csv'
asset = datasource.add_csv_asset(asset_name, filepath_or_buffer=path_to_data)

# Build batch request
batch_request = asset.build_batch_request()

DataContextError: Can not write the fluent datasource UNdata_literacy rate.csv because a datasource of that name already exists in the data context.

In [13]:
# Creat an expectation suite
expectation_suite_name = 'expectation-literacy-rate-dataset'
context.add_or_update_expectation_suite(expectation_suite_name)

# Create a validator using above expectation suite
validator = context.get_validator(
    batch_request = batch_request,
    expectation_suite_name = expectation_suite_name
)

# Check the validator
validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,Reference Area,Time Period,Sex,Age group,Units of measurement,Observation Value
0,Afghanistan,1979,Female,15-24 year olds,Percent,11.1428
1,Afghanistan,2011,Female,15-24 year olds,Percent,32.113223
2,Afghanistan,2015,Female,15-24 year olds,Percent,46.10599
3,Afghanistan,1979,Male,15-24 year olds,Percent,45.79602
4,Afghanistan,2011,Male,15-24 year olds,Percent,61.879069


#### Automasi Data Pipeline

In [14]:
import pandas as pd

def load_data(file_path: str) -> pd.DataFrame:
    """
    Fungsi untuk membaca data dari file dan mengembalikan Pandas DataFrame.

    Parameters:
        file_path (str): Path atau URL ke file dataset (CSV, Excel, dll).

    Returns:
        pd.DataFrame: Data yang telah dimuat dalam bentuk Pandas DataFrame.
    """
    try:
        # Membaca file menggunakan pandas
        data = pd.read_csv(file_path)  # Pastikan menggunakan parameter file_path
        print(f"Data berhasil dibaca dari: {file_path}")
        return data
    except Exception as e:
        print(f"Terjadi kesalahan saat membaca file: {e}")
        raise  # Melempar ulang exception agar bisa ditangani lebih lanjut

# Memanggil fungsi dengan path file yang sesuai
file_path = 'UNdata_literacy rate.csv'
load_data(file_path)  # Ini memanggil fungsi


Data berhasil dibaca dari: UNdata_literacy rate.csv


Unnamed: 0,Reference Area,Time Period,Sex,Age group,Units of measurement,Observation Value
0,Afghanistan,1979,Female,15-24 year olds,Percent,11.142800
1,Afghanistan,2011,Female,15-24 year olds,Percent,32.113223
2,Afghanistan,2015,Female,15-24 year olds,Percent,46.105990
3,Afghanistan,1979,Male,15-24 year olds,Percent,45.796020
4,Afghanistan,2011,Male,15-24 year olds,Percent,61.879069
...,...,...,...,...,...,...
4005,Zimbabwe,1982,Male,15-24 year olds,Percent,93.826960
4006,Zimbabwe,1992,Male,15-24 year olds,Percent,96.540620
4007,Zimbabwe,2011,Male,15-24 year olds,Percent,89.590580
4008,Zimbabwe,2015,Male,15-24 year olds,Percent,89.966820


##### Proses ETL ke Mongodb dengan membuat file 'transform.py' dan 'load.py' dengan terpisah 

In [11]:
# Koneksi ke MongoDB
client = pymongo.MongoClient("mongodb+srv://auliarizkirumahhorbo:auliya@aul-p.fc96f.mongodb.net/")  # Ganti dengan URL MongoDB Anda
db = client["literacy_db"]  # Nama database
collection = db["literacy_data"]  # Nama koleksi

NameError: name 'pymongo' is not defined

##### Membuat apache airflow untuk menjadwalkan dan memantau workflow 