# Reorganize data

In [0]:
# Modules
import os
from pathlib import Path
from shutil import rmtree
import pandas as pd

!pip install tqdm
from tqdm import tqdm

!pip install pyunpack
!pip install patool
from pyunpack import Archive



In [0]:
# Directories
path = '/mnt/DAP/data/ColombiaProject-TransMilenioRawData/'

In [0]:
# Check if the given file path already exists on DBFS
def dbfs_file_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise

In [0]:
# Trying to export from DBFS to local machine
#dbutils.fs.put("/FileStore/my-stuff/my-file.txt", "This is the actual text that will be saved to disk. Like a 'Hello world!' example")

## 1. Create Workspace
All files will be transferred there from:
- Ingestion point for files we had on OneDrive: /mnt/DAP/data/ColombiaProject-TransMilenioRawData/Documents
- Download point for files fetched from TM Google API: /mnt/DAP/data/ColombiaProject-TransMilenioRawData/Data

In [0]:
#  Create Workspace directory if it does not exist
dbutils.fs.mkdirs(path + '/Workspace/')
dbutils.fs.mkdirs(path + '/Workspace/Raw/')
dbutils.fs.mkdirs(path + '/Workspace/Clean/')
dbutils.fs.mkdirs(path + '/Workspace/variable_dicts/')

In [0]:
dbutils.fs.ls(path + '/Workspace/')

In [0]:
dbutils.fs.ls(path + '/Workspace/Clean/')

`/Workspace/Raw/` 
Is a unique folder for storing all raw data. It still has different file structures:

- `/since2020`: data since 2020 is organized in ValidacionTroncal, ValidacionZonal, ValidacionDual, Salidas, Recargas folders. 

  - Recargas
  - Salidas
  - ValidacionDual
  - ValidacionTroncal
  - ValidacionZonal


- `/2017`
  [TO REORGANIZE]

- `/byheader_dir`: raw validaciones files organized in folders by header

`/Workspace/variable_dicts/` 

## 2. Moving 2017 data [TO REORGANIZE]

All of these data come from the Ingestion point.

In [0]:
ingestion2017_dir = path +'/Documents/2017data'
raw2017_dir = path + '/Workspace/Raw/2017'

dbutils.fs.mkdirs(raw2017_dir)
files = dbutils.fs.ls(ingestion2017_dir)
[f.name for f in files]

**Moving individual files from Oct, Nov, and Dec 2017 to Raw/2017 folder:**
- Troncal Dec:  extracted from 7z with patool from .7z file
- Troncal and Zonal Oct, Zonal Dec: moved from decompressed individual folders
- Troncal and Zonal Nov: extract from decompressed folder, but using patool
  - _Note: valzonal_27nov2017_MCKENNEDY.gz is corrupted and cannot be extracted_ 

In [0]:
# Troncal December files can be extracted with patool
f = 'ValTroncal Dic2017.7z'
Archive( "/dbfs" + ingestion2017_dir  + "/" + f).extractall("/dbfs" + raw2017_dir )

In [0]:
# Take the others from the decompressed folder
decompressed =  [f.name for f in dbutils.fs.ls(ingestion2017_dir + "/decompressed") ]
print(decompressed)

for folder in decompressed:
    print("---------------")
    print( folder, ":")
    print([f.name for f in dbutils.fs.ls(ingestion2017_dir + "/decompressed/" + folder) ])

In [0]:
# All but november's can be directly moved
for folder in ['ValTroncal Oct2017/', 'ValZonal Dic2017/', 'ValZonal Oct2017/']:
    files = [f.name for f in dbutils.fs.ls(ingestion2017_dir + "/decompressed/" + folder) ]
    for f in tqdm(files):
        dbutils.fs.cp(ingestion2017_dir + "/decompressed/" + folder + f, raw2017_dir)

In [0]:
for folder in ['ValTroncal Nov2017/', 'ValZonal Nov2017/']:
    d = "/dbfs" + ingestion2017_dir + "/decompressed/" + folder
    subfolders = [d + f for f in os.listdir(d) ]
    for fd in tqdm(subfolders):
        files = os.listdir(fd)
        for f in files:
            if f == 'valzonal_27nov2017_MCKENNEDY.gz': # the file is corrupted and cannot be extracted
                pass
            else:
                Archive( fd + "/" + f ).extractall("/dbfs" + raw2017_dir )
    

In [0]:
f = "/dbfs" + ingestion2017_dir + "/decompressed/ValZonal Nov2017/16. valzonal_16nov2017/valzonal_16nov2017_ETIB.gz"
Archive( f ).extractall("/dbfs" + raw2017_dir )
df = pd.read_csv('/dbfs/mnt/DAP/data/ColombiaProject-TransMilenioRawData/Workspace/Raw/2017/valzonal_16nov2017_ETIB')

## 3. Moving data since 2020
To `Workspace/raw/since2020`, from both the Ingestion point (Documents folder) and the Download Point (Data folder). The folder will follow the same structure that the Data folder.

In [0]:
raw2020_dir = path + '/Workspace/Raw/since2020/'
dbutils.fs.mkdirs(raw2020_dir)

for d in ['Recargas/', 'Salidas/', 'ValidacionDual/', 'ValidacionTroncal/', 'ValidacionZonal/']:
    dbutils.fs.mkdirs(raw2020_dir + d)

os.listdir('/dbfs/mnt/DAP/data/ColombiaProject-TransMilenioRawData/Workspace/Raw/since2020/ValidacionZonal')

### Ingestion point (Documents folder)

In [0]:
os.listdir('/dbfs' + path + '/Documents/')


In [0]:
folders = ["Zonal2023",
           "Zonal2022",
           "Zonal2021",
           "Zonal2020",
           "Troncal2023",
           "Troncal2022",
           "Troncal2021",
           "Troncal2020",
           "Dual2023",
           "Dual2022",
           "Dual2021",
           "Dual2020",
           "salidas2023"]

for f in folders:
    files = dbutils.fs.ls('/mnt/DAP/data/ColombiaProject-TransMilenioRawData/Documents/' + f)
    print(f, "-", len(files))

In [0]:
names = [f[0][77:] for f in files]
# check for duplicates
rawnames = [n[:15] for n in names]
print(len(rawnames) == len(names))

### Download point (Data folder)

In [0]:
os.listdir('/dbfs' + path + '/Data/')

In [0]:
os.listdir('/dbfs' + path + '/Data/Recargas/')[0:10]

In [0]:
os.listdir('/dbfs' + path + '/Data/Recargas/2023')

In [0]:
os.listdir('/dbfs' + path + '/Data/ValidacionZonal')[0:30]
# os.listdir('/dbfs' + path + '/Data/ValidacionZonal/2024')

In [0]:
os.listdir('/dbfs' + path + '/Data/ValidacionZonal/2024')

### Move

#### 1. Move from query point (Data folder) [this should be moved to the Mondays Job]


In [0]:
raw2020_dir = path + '/Workspace/Raw/since2020/'

# Validaciones
for d in ['ValidacionDual/', 'ValidacionTroncal/', 'ValidacionZonal/']:
    files = [f.name for f in dbutils.fs.ls(path + "/Data/" + d) ]
    vfiles = [f for f in files if 'validacion' in f]
    print(len(vfiles))
    
    for f in tqdm(vfiles):
        origin = f'{path}/Data/{d}{f}'
        target = f'{raw2020_dir}{d}{f}'
        # Only copy new files
        if not dbfs_file_exists(target):
            dbutils.fs.cp(origin, target)
            print(f'{target} COPIED')

#### 2. Move from ingestion point (Documents folder)

In [0]:
dic_d =  {"Zonal2023/"  : 'ValidacionZonal/'    ,
          "Zonal2022/"  : 'ValidacionZonal/'    ,
          "Zonal2021/"  : 'ValidacionZonal/'    ,
          "Zonal2020/"  : 'ValidacionZonal/'    ,
          "Troncal2023/": 'ValidacionTroncal/'  ,
          "Troncal2022/": 'ValidacionTroncal/'  ,
          "Troncal2021/": 'ValidacionTroncal/'  ,
          "Troncal2020/": 'ValidacionTroncal/'  ,
          "Dual2023/"   : 'ValidacionDual/'     ,
          "Dual2022/"   : 'ValidacionDual/'     ,
          "Dual2021/"   : 'ValidacionDual/'     ,
          "Dual2020/"   : 'ValidacionDual/'     }

In [0]:
for d in [ "Zonal2023/"   ,
            "Zonal2022/"  ,
            "Zonal2021/"  ,
            "Zonal2020/"  ,
            "Troncal2023/",
            "Troncal2022/",
            "Troncal2021/",
            "Troncal2020/",
            "Dual2023/"   ,
            "Dual2022/"   ,
            "Dual2021/"   ,
            "Dual2020/"   ]:
   df = dic_d[d]
   files = [f.name for f in dbutils.fs.ls(path + "/Documents/" + d) ]
   vfiles = [f for f in files if 'validacion' in f]
   print(len(vfiles))
    
   for f in tqdm(vfiles):
              dbutils.fs.cp(path + "/Documents/" + d + f, 
                      path + '/Workspace/Raw/since2020/'+ df + f)
    

### Check files in /Workspace/Raw/since2020/
