In [0]:
"""
This folder contains all necessary paths for the project.
"""

# LOCAL
LOCAL_TMP = "/local_disk0/tmp/citibike"

# ADLS
ADLS_DOWNLOADS  = "abfss://citibike@databricksjm.dfs.core.windows.net/downloads"
ADLS_BRONZE     = "abfss://citibike@databricksjm.dfs.core.windows.net/bronze"
ADLS_SILVER     = "abfss://citibike@databricksjm.dfs.core.windows.net/silver"
ADLS_GOLD       = "abfss://citibike@databricksjm.dfs.core.windows.net/gold"

# S3 BUCKET
S3_BUCKET       = "https://s3.amazonaws.com/tripdata"

import os, re, requests, xml.etree.ElementTree as ET
from typing import Optional, Tuple

ZIP_RE = re.compile(r"(\d{4}(?:\d{2})?)-citibike-tripdata\.zip")

def _session() -> requests.Session:
    s = requests.Session()
    s.headers.update({"User-Agent": "citibike-ingestion/1.0"})
    return s

def _download_stream(sess: requests.Session, src_url: str, dest_path: str, chunk=8*1024*1024):
    with sess.get(src_url, stream=True, timeout=30) as resp:
        resp.raise_for_status()
        with open(dest_path, "wb") as f:
            for part in resp.iter_content(chunk_size=chunk):
                if part:
                    f.write(part)

def download_data(
    s3_url: str,
    adls_dir_url: str,
    dbutils,
    local_dir: str,
    year_filter: Optional[Tuple[int, int]] = None,
    limit: Optional[int] = None
):  
    # local dir
    os.makedirs(local_dir, exist_ok=True)
    # cloud dir
    dbutils.fs.mkdirs(adls_dir_url)

    # existing ADLS filenames
    try:
        existing = {fi.name.rstrip("/") for fi in dbutils.fs.ls(adls_dir_url)}
    except Exception:
        dbutils.fs.mkdirs(adls_dir_url)
        existing = set()

    # list all files in S3 bucket (XML, <Key>)    
    list_url = f"{s3_url.rstrip('/')}/?list-type=2"
    sess = _session()
    r = sess.get(list_url, timeout=20)
    r.raise_for_status()
    keys = [e.text for e in ET.fromstring(r.text).iterfind(".//{*}Key")]

    # loop through all files and download if not already in ADLS
    downloaded = skipped = failed = 0
    for i, key in enumerate(keys, start=1):
        fname = os.path.basename(key)
        if not ZIP_RE.fullmatch(fname):
            continue

        # filter by year (optional)
        if year_filter:
            try:
                yr = int(fname[:4])
            except ValueError:
                continue
            y1, y2 = year_filter
            if not (y1 <= yr <= y2):
                continue

        # if already exists in ADLS, skip
        if fname in existing:
            print(f"[{i}] SKIP (ADLS): {fname}")
            skipped += 1
            continue

        file_url  = f"{s3_url.rstrip('/')}/{fname}"
        local_path = os.path.join(local_dir, fname)
        adls_path  = f"{adls_dir_url}/{fname}"

        # download single file
        print(f"[{i}] Downloading: {fname}")
        try:
            _download_stream(sess, file_url, local_path)
            # move to ADLS
            dbutils.fs.mv("file:" + local_path, adls_path, True)
            print(f"[{i}] MOVED â†’ ADLS: {fname}")
            existing.add(fname)
            downloaded += 1
        except Exception as e:
            print(f"[{i}] FAILED {fname}: {e}")
            failed += 1
            if os.path.exists(local_path):
                try: os.remove(local_path)
                except: pass

        if limit and downloaded >= limit:
            break

    return {"downloaded": downloaded, "skipped": skipped, "failed": failed}

    """
Pipeline: Download Citibike ZIP files from S3 and upload them to ADLS (downloads folder).
"""

from pyspark.dbutils import DBUtils
# from utilities.paths import S3_BUCKET, ADLS_DOWNLOADS, LOCAL_TMP
# from utilities.ingestion_utils import download_data


def run():
    dbu = DBUtils(spark)

    stats = download_data(
        s3_url=S3_BUCKET,
        adls_dir_url=ADLS_DOWNLOADS,
        dbutils=dbu,
        local_dir=LOCAL_TMP,
        year_filter=(2018, 2023),
        limit=None               
    )

    print("Ingestion finished.")
    print(f"Downloaded: {stats['downloaded']} | Skipped: {stats['skipped']} | Failed: {stats['failed']}")


if __name__ == "__main__":
    run()


```
-przechodzi do external data (ADLS) -> folderu downloads
-iteruje po plikach (ADLS)
  -tworzy folder local_disk0/tmp/citibike 
  -pobiera do local_disk0/tmp/citibike
  -przechodzi do local_disk0/tmp/citibike
  -iteruje po plikach (citibike)
    -sprawdza czy zip:

      TAK: 	-tworzy folder local_disk0/tmp/citibike/extracted
            -pobiera cala nazwe archiwum zip
            -pobiera 4 pierwsze znaki archiwum zip (rok)
            -rozpakowywuje do tmp/extracted
            -przechodzi do extracted
            -iteruje po kazdym pliku (citibike/extracted)
              -sprawdza czy plik jest folderem i nazwa == nazwa archiwum zip:
                TAK:  -przechodzi do tego folderu:
                      -tworzy folder (nazwa = rok)
                      -iteruje po plikach (citibike/extracted/nazwa):
                        -sprawdza czy nazwa == (rok):
                          TAK:  -continue
                          NIE:  -sprawdza czy zip:
                                    TAK:  -rozpakowywuje do (rok)
                                    NIE:  -sprawdza czy folder:
                                            TAK: -zawartosc przenosi do (rok)
                                            NIE: -continue
                      -przenosi (rok) do ADLS bronze
                      -usuwa local_disk0/tmp/citibike

                NIE:  -usuwa local_disk0/tmp/citibike
                      -break

      NIE:  -usuwa local_disk0/tmp/citibike
            -break
```

In [0]:
import os, zipfile, shutil

DOWNLOADS = "abfss://citibike@databricksjm.dfs.core.windows.net/downloads"
BRONZE = "abfss://citibike@databricksjm.dfs.core.windows.net/bronze"
LOCAL_TMP = "/local_disk0/tmp/citibike"
EXTRACTED = f"{LOCAL_TMP}/extracted"
adls_files = dbutils.fs.ls(DOWNLOADS)

# -przechodzi do external data (ADLS) -> folderu downloads
# -iteruje po plikach (ADLS)

skipped = 0
processed = 0

for adls_file in adls_files:

    # nazwa pliku adls
    adls_filename = adls_file.name

    # pobranie roku z nazwy pliku
    year = adls_filename[:4]
    
    print(f"Now processing: {adls_filename}")

    try:
        dbutils.fs.ls(f"{BRONZE}/{year}")
        print(f"{year} Folder already exists.\nSkipping...")
        skipped += 1
        continue
    except:
        print(f"{year} folder doesnt exist.\nContinuing...")

    # tworzenie folderu lokalnego
    os.makedirs(LOCAL_TMP, exist_ok=True)
    print(f"Created local folder: {LOCAL_TMP}")

    # PRZYGOTOWANIE DO KOPII (ADLS -> LOCAL)
    # zrodlo
    source_path = f"{DOWNLOADS}/{adls_filename}"
    # cel
    destination_path = f"file:{LOCAL_TMP}/{adls_filename}"
    # przepisanie pliku z ADLS do lokalnego folderu}"
    dbutils.fs.cp(source_path, destination_path)
    print(f"File copied from ADLS to local: {adls_filename}")

    # iteracja po local (citibike)
    print("Iterating through local...")
    for local_file in os.listdir(LOCAL_TMP):
        # sprawdzanie czy zip
        if local_file.lower().endswith(".zip"):
            print(".zip file found")
            # tworzenie sciezki zipa
            zip_path = os.path.join(LOCAL_TMP, local_file)
            # tworzenie folderu do wypakowywania
            os.makedirs(EXTRACTED, exist_ok=True)
            # wypakowanie do extracted
            print(f"Extracting zip file to {EXTRACTED}...")
            with zipfile.ZipFile(zip_path, "r") as z:
                z.extractall(EXTRACTED)
            # iteracja po extracted
            print("Iterating through extracted files...")
            for extracted_file in os.listdir(EXTRACTED):
                # sprawdzanie czy to folder o tej samej nazwie jak zip
                if extracted_file == adls_filename[:-4]:
                    # tworzenie folderu roku
                    os.makedirs(f"{EXTRACTED}/{extracted_file}/{year}", exist_ok=True)
                    # iteracja po plikach w folderze (tym rozpakowanym)
                    for file_in_extracted_file in os.listdir(f"{EXTRACTED}/{extracted_file}"):
                        # sprawdzanie czy to folder typu "1_" lub "12_" itp.
                        if (file_in_extracted_file[1] == '_') or (file_in_extracted_file[2] == '_'):
                            print("Processing old type")
                            # jesli tak to iteracja po tym folderze
                            for file_in_extracted_file_2 in os.listdir(f"{EXTRACTED}/{extracted_file}/{file_in_extracted_file}"):
                                # przepisanie zawartosci do folderu z rokiem
                                # zrodlo
                                source_path = f"{EXTRACTED}/{extracted_file}/{file_in_extracted_file}/{file_in_extracted_file_2}"
                                # cel
                                destination_path = f"{EXTRACTED}/{extracted_file}/{year}"
                                # transfer
                                shutil.move(source_path, destination_path)
                            # jesli to nie folder typu "1_" lub "12_" to sprawdzanie czy jest zipem
                        elif file_in_extracted_file.endswith(".zip"):
                            print("Processing new type")
                            # jesli zip to wypakowanie do folderu roku
                            with zipfile.ZipFile(f"{EXTRACTED}/{extracted_file}/{file_in_extracted_file}", "r") as z:
                                z.extractall(f"{EXTRACTED}/{extracted_file}/{year}")
                        else: continue
                    # przeniesienie do ADLS
                    print(f"Moving extracted files to ADLS: {year}")
                    local = f"{EXTRACTED}/{extracted_file}/{year}"
                    dbutils.fs.cp(f"file:{local}", f"{BRONZE}/{year}", recurse=True)
                    # usuniecie folderu lokalnego citibike
                    shutil.rmtree(LOCAL_TMP)
                    print(f"Removed local folder: {LOCAL_TMP}")
                    print("############### DONE ############### ")
                    processed += 1
            break
print(f"Processed: {processed} | Skipped: {skipped}")


In [0]:
os.path.exists(LOCAL_TMP)
# os.listdir(LOCAL_TMP)

```
bronze->silver
-wczytanie danych
-wybor kolumn
-narzucenie typow

dodanie kolumn:
  -dodanie duration_sec (ended_at - started_at)
    -jesli duration_sec > 89970: odrzucic
  -rok
  -miesiac
  -dzien 
```

# BRONZE -> SILVER

In [0]:
ADLS_BRONZE     = "abfss://citibike@databricksjm.dfs.core.windows.net/bronze"
ADLS_SILVER     = "abfss://citibike@databricksjm.dfs.core.windows.net/silver"


df_old = (
    spark.read
    .format("csv")
    .option("header", True)
    .load(f"{ADLS_BRONZE}/2014")
)
df_new = (
    spark.read
    .format("csv")
    .option("header", True)
    .load(f"{ADLS_BRONZE}/2020")
)

In [0]:
df_new.printSchema()

In [0]:
from pyspark.sql.types import *

SILVER_SCHEMA = StructType([
    StructField("ride_id",              StringType(),       True),
    StructField("rideable_type",        StringType(),       True),
    StructField("started_at",           TimestampType(),    True), # starttime
    StructField("ended_at",             TimestampType(),    True), # stoptime
    StructField("start_station_name",   StringType(),       True), # start station name
    StructField("start_station_id",     StringType(),      True), # start station id
    StructField("end_station_name",     StringType(),       True), # end station name
    StructField("end_station_id",       StringType(),      True), # end station id
    StructField("start_lat",            DoubleType(),       True), # start station latitude
    StructField("start_lng",            DoubleType(),       True), # start station longitude
    StructField("end_lat",              DoubleType(),       True), # end station latitude
    StructField("end_lng",              DoubleType(),       True), # end station longitude
    StructField("member_casual",        StringType(),       True), # usertype (values: subscriber, customer)
    #StructField("bikeid", IntegerType(), True), -bikeid
    #StructField("birth year", IntegerType(), True), -birthyear
    #StructField("gender", IntegerType(), True) -gender
])

silver_cols = [f.name for f in SILVER_SCHEMA.fields]
silver_types = {f.name: f.dataType for f in SILVER_SCHEMA.fields}

In [0]:
ALIASES = {
    "starttime": "started_at",
    "stoptime": "ended_at",
    "start station name": "start_station_name",
    "start station id": "start_station_id",
    "end station name": "end_station_name",
    "end station id": "end_station_id",
    "start station latitude": "start_lat",
    "start station longitude": "start_lng",
    "end station latitude": "end_lat",
    "end station longitude": "end_lng",
    "usertype": "member_casual"
}

VALUE_MAP = {
    "subscriber": "member",
    "customer": "casual"
}

```
-idzie do bronze
-zapisuje nazwy folderow (lata) do listy
-robi df wedlug lat z listy
-sprawdza format df (stary czy nowy) np if year>2019
 -jesli nowy to wczytuje df z SILVER_SCHEMA
 -jesli stary to wczytuje df bez schemy,
  -robi mapowania i doprowadza do stanu z SILVER_SCHEMA
```


In [0]:
ADLS_BRONZE     = "abfss://citibike@databricksjm.dfs.core.windows.net/bronze"

years = [year.name for year in dbutils.fs.ls(ADLS_BRONZE)]
print(years)


In [0]:
def new_format(df):
    return "member_casual" in df.columns

In [0]:
from pyspark.sql.functions import col, when, lower, lit
def normalize_old(df):

    df = (
        df
        .withColumn("ride_id", lit(None))
        .withColumn("rideable_type", lit(None))
    )

    ALIASES = {
        "starttime": "started_at",
        "stoptime": "ended_at",
        "start station name": "start_station_name",
        "start station id": "start_station_id",
        "end station name": "end_station_name",
        "end station id": "end_station_id",
        "start station latitude": "start_lat",
        "start station longitude": "start_lng",
        "end station latitude": "end_lat",
        "end station longitude": "end_lng",
        "usertype": "member_casual"
    }

    VALUE_MAP = {
        "subscriber": "member",
        "customer": "casual"
    }

    # MAPPING COLUMN NAMES
    for old_column, new_column in ALIASES.items():
        if old_column in df.columns:
            df = df.withColumnRenamed(old_column, new_column)
        else: print(f"MAPPING COLUMN NAMES ERROR: Couldn't find column: {old_column}")
    
    # MAPPING VALUES MEMBER_CASUAL
    if "member_casual" in df.columns:
        for old_value, new_value in VALUE_MAP.items():
            df = df.withColumn(
                "member_casual",
                when(lower(col("member_casual")) == old_value, new_value)
                .otherwise(col("member_casual"))
            )
    else: print(f"MAPPING VALUES MEMBER_CASUAL ERROR: Couldn't find column: member_casual")

    df = (
        df.select(
            "ride_id",
            "rideable_type",
            "started_at",
            "ended_at",
            "start_station_name",
            "start_station_id",
            "end_station_name",
            "end_station_id",
            "start_lat",
            "start_lng",
            "end_lat",
            "end_lng",
            "member_casual"
        )
    )
    return df

In [0]:
##### CONVERT TO SILVER FORMAT #####
def to_silver(df):

    SILVER_SCHEMA = StructType([
        StructField("ride_id",              StringType(),       True),
        StructField("rideable_type",        StringType(),       True),
        StructField("started_at",           TimestampType(),    True), # starttime
        StructField("ended_at",             TimestampType(),    True), # stoptime
        StructField("start_station_name",   StringType(),       True), # start station name
        StructField("start_station_id",     StringType(),      True), # start station id
        StructField("end_station_name",     StringType(),       True), # end station name
        StructField("end_station_id",       StringType(),      True), # end station id
        StructField("start_lat",            DoubleType(),       True), # start station latitude
        StructField("start_lng",            DoubleType(),       True), # start station longitude
        StructField("end_lat",              DoubleType(),       True), # end station latitude
        StructField("end_lng",              DoubleType(),       True), # end station longitude
        StructField("member_casual",        StringType(),       True), # usertype (values: subscriber, customer)
        #StructField("bikeid", IntegerType(), True), -bikeid
        #StructField("birth year", IntegerType(), True), -birthyear
        #StructField("gender", IntegerType(), True) -gender
    ])

    silver_cols = [f.name for f in SILVER_SCHEMA.fields]
    silver_types = {f.name: f.dataType for f in SILVER_SCHEMA.fields}
    return df.select(
        [col(c).cast(silver_types[c]) for c in silver_cols]
    )

In [0]:
display(df_new1.withColumn("duration_sec", (col("ended_at").cast("long") - col("started_at").cast("long"))).limit(2))

In [0]:
df_new1 = to_silver(df_new)

In [0]:
display(df_new)

In [0]:
ADLS_SILVER = "abfss://citibike@databricksjm.dfs.core.windows.net/silver"
df_silver = spark.read.format("delta").load(f"{ADLS_SILVER}/year=2017")