<a href="https://colab.research.google.com/github/WalterPaixaoCortes/r3s-scripts/blob/main/notebooks/Data_Exploration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ECPA file download

## Importing libraries

In [None]:
import os
import datetime
import zipfile
import gzip
import traceback
import glob
import gc
import logging
import sqlite3
import sys

from sqlalchemy import event
from sqlalchemy import create_engine
from logging.handlers import TimedRotatingFileHandler
from urllib.parse import urlparse
from dotenv import load_dotenv

import pandas as pd
import requests as r

from bs4 import BeautifulSoup

## Declaring auxiliary functions

In [None]:
def count_lines(file_name):
    fp = open(file_name,'r', encoding="iso-8859-1")
    for line_count, line in enumerate(fp):
        pass
    return line_count


## Defining the parameters for execution

In [None]:
create_folders = False
download_files = False
unzip_files = False
use_sqlite = False
clean_database = True
save_to_database = True
validate_process = True
commit_size = 2000

## Defining the variables

### Load Environment Variables

In [None]:
load_dotenv()

### Initializing Logger

In [None]:
fhandler = TimedRotatingFileHandler("logs/log.log", when="midnight", interval=1)
fhandler.suffix = "%Y%m%d"
logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        handlers=[fhandler, logging.StreamHandler(sys.stdout)],
    )
logger = logging.getLogger(__name__)

### Build download URLS list

In [None]:
download_urls = ["https://echo.epa.gov/files/echodownloads/frs_downloads.zip",
                 "https://echo.epa.gov/files/echodownloads/case_downloads.zip", 
                 "https://echo.epa.gov/files/echodownloads/npdes_downloads.zip",
                 "https://echo.epa.gov/files/echodownloads/npdes_eff_downloads.zip",
                 "https://echo.epa.gov/files/echodownloads/npdes_master_general_permits.zip",
                 "https://echo.epa.gov/files/echodownloads/npdes_outfalls_layer.zip",
                 "https://echo.epa.gov/files/echodownloads/npdes_limits.zip",
                 "https://echo.epa.gov/files/echodownloads/SDWA_latest_downloads.zip"]

For TRI files, we need to add a sequence of files, since 1987.

In [None]:
tri_start = 1987
tri_end = datetime.datetime.now().year -1
tri_end_url = "https://www3.epa.gov/tri/pds/US_%s.zip"
tri_url = "https://www3.epa.gov/tri/current/US_%s.zip"

logger.info(f"Loading URLs for TRI downloads from {tri_start} to {tri_end}...")
year = tri_start
while year <= tri_end:
  if year == tri_end:
    url = tri_end_url % year
  else:
    url = tri_url % year
  
  download_urls.append(url)
  year += 1

For WQI files, we need to detect the correct files on the folder.

In [None]:
base_wqi_url = "https://echo.epa.gov/files/echodownloads/Data-Analytics/WQI"

logger.info(f"Loading URLs for WQI downloads...")
response = r.get(base_wqi_url)
soup = BeautifulSoup(response.content, 'html.parser')
links = soup.find_all('a')

for item in links:
  if "ResultFileToEnd2Output" in item["href"]:
    download_urls.append(f'{base_wqi_url}/{item["href"]}')

There is a special routine for DMR files as well.

In [None]:
base_dmr_url = "https://echo.epa.gov/files/echodownloads"

logger.info(f"Loading URLs for DMR downloads...")
response = r.get(base_dmr_url)
soup = BeautifulSoup(response.content, 'html.parser')
links = soup.find_all('a')

for item in links:
  if "npdes_dmrs_" in item["href"]:
    download_urls.append(f'{base_dmr_url}/{item["href"]}')

### Initializing Variables

In [None]:
zipfile_folder = "zipfiles"
unzipped_folder = "rawfiles"
database_folder = "database"
extension = ".zip"

allowed_extensions = [".txt",".csv"]
database_name = f"{database_folder}/source.db"

my_conn = None

Now, to help us out to not download files that were already downloaded, lets generate a list of downloaded files

In [None]:
downloaded_files = []
for item in os.listdir(zipfile_folder):
  downloaded_files.append(os.path.basename(urlparse(item).path))

## Defining the environment

In [None]:
if create_folders:
  if not os.path.exists(zipfile_folder): 
    os.mkdir(zipfile_folder)
  if not os.path.exists(unzipped_folder): 
    os.mkdir(unzipped_folder)
  if not os.path.exists(database_folder): 
    os.mkdir(database_folder)
else:
  logger.info("Folders already created...")

## Cleaning up database

In [None]:
if use_sqlite and clean_database:
  logger.info ("Cleaning database to restart insert operation...")
  if os.path.exists(database_name):
    if my_conn:
      my_conn.close()
    os.remove(database_name)
else:
  logger.info("Database will be used as is...")

## Connecting or Creating database

In [None]:
my_conn = None
if use_sqlite:
  my_conn=sqlite3.connect(database_name)
else:
  logger.info(os.getenv("PG_DATA_CONN"))
  my_conn = create_engine(os.getenv("PG_DATA_CONN"))  
logger.info("Connected to database...")

## Download zip files

In [None]:
if download_files:
  logger.info(f"Starting download process. Total files to be downloaded: {len(download_urls)}...")
  for download_url in download_urls:
    file_name = os.path.basename(urlparse(download_url).path)
    if file_name not in downloaded_files:
      logger.info(f"Downloading file {file_name}...")
      try:
        response = r.get(download_url, allow_redirects=True)
        with open(os.path.join(zipfile_folder, file_name), "wb") as fw:
          fw.write(response.content)
          logger.info(f"--> File {file_name} saved.")
      except:
          logger.error(f"--> File {file_name} not downloaded.")
else:
  logger.info("Files already downloaded...")


## Unzip the files

In [None]:
if unzip_files:
  for item in os.listdir(zipfile_folder):
    if item.endswith(extension) and item not in downloaded_files: 
      logger.info(f"Unzipping file {item}...")
      try:
        file_name = os.path.abspath(os.path.join(zipfile_folder, item)) 
        zip_ref = zipfile.ZipFile(file_name)
        zip_ref.extractall(unzipped_folder)
        zip_ref.close()
        logger.info(f"--> File {item} unzipped.")
      except:
        logger.error(f"--> File {item} not unzipped.")
    elif item.endswith(".gz")  and item not in downloaded_files:
      logger.info(f"Decompressing file {item}...")
      try:
        file_name = os.path.abspath(os.path.join(zipfile_folder, item)) 
        new_file_name = os.path.abspath(os.path.join(unzipped_folder, item.replace(".gz",""))) 
        file_out = gzip.decompress(open(file_name, 'rb').read())
        with open(new_file_name, 'wb') as fw:
          fw.write(file_out)
        logger.info(f"File {new_file_name} decompressed and saved...")        
      except:
        logger.error(f"--> File {item} not decompressed.")
    else:
      logger.info(f"Skipping file {item}.")
else:
  logger.info("Files already unzipped...")

## Save to database

In [None]:
if save_to_database:
  logger.info("Preparing list of files to be processed...")
  list_of_files = filter(os.path.isfile, glob.glob(unzipped_folder + '/*') )
  list_of_files = sorted(list_of_files, key =  lambda x: os.stat(x).st_size)  
  files = [os.path.basename(item) for item in list_of_files]
  
  for item in files:
    table_name, file_ext = os.path.splitext(os.path.basename(item))
    df = None
    if file_ext in allowed_extensions and not table_name.startswith("ResultFile"):
      second = False
      if not table_name.startswith("US"):
        try:
          for df in pd.read_csv(os.path.join(unzipped_folder, item), encoding="iso-8859-1", index_col=False, chunksize=commit_size, dtype=str, on_bad_lines="skip", encoding_errors="replace"):
            df.to_sql(table_name, my_conn, schema="source",
                    if_exists="append", 
                    index=False)
          logger.info(f"File {item} saved on the database...")
        except:
          logger.error(traceback.format_exc())
          second = True
      else:
        second = True
        
      if second:
        try:      
          for df in pd.read_csv(os.path.join(unzipped_folder, item), encoding="iso-8859-1", sep="\t", index_col=False, chunksize=commit_size, dtype=str, on_bad_lines="skip", encoding_errors="replace"):
            df.to_sql(table_name, my_conn,  schema="source",
                    if_exists="append", 
                    index=False)
          logger.info(f"File {item} saved on the database...")
        except:
          logger.error(f"File {item} not saved on the database...")
    gc.collect()

else:
  logger.info("Database already loaded...")

## Validating Load Process

Here we will check based on line counts if the process to send to the database was sucessful or not.

In [None]:
if validate_process:
  success_data = { "File": [], "File Lines": [], "Table": [], "Table Rows": [], "Difference": []}
  error_data = { "File": [], "File Lines": [], "Table": [], "Table Rows": [], "Difference": []}

  logger.info("Preparing list of files to be processed...")
  list_of_files = filter(os.path.isfile, glob.glob(unzipped_folder + '/*') )
  list_of_files = sorted(list_of_files, key =  lambda x: os.stat(x).st_size)  
  files = [os.path.basename(item) for item in list_of_files]
  for item in files:
    try:
      file_name = os.path.join(unzipped_folder, item)
      table_name, file_ext = os.path.splitext(os.path.basename(item))
      logger.info(f"{file_name} and {table_name} being compared...")
      if file_ext in allowed_extensions:
        file_count = count_lines(file_name)
        db_count = my_conn.execute(f"select count(*) from {table_name}").fetchone()[0]
        diff = file_count - db_count
        if diff > 0:
          error_data["File"].append(file_name)
          error_data["File Lines"].append(file_count)
          error_data["Table"].append(table_name)
          error_data["Table Rows"].append(db_count)
          error_data["Difference"].append(diff)
        else:
          success_data["File"].append(file_name)
          success_data["File Lines"].append(file_count)
          success_data["Table"].append(table_name)
          success_data["Table Rows"].append(db_count)
          success_data["Difference"].append(diff)
    except:
        logger.error(traceback.format_exc())

  logger.info("Saving results as files...")
  error_report = pd.DataFrame(error_data)
  error_report.to_markdown(os.path.join(database_folder, "issues.md"))

  success_report = pd.DataFrame(success_data)
  success_report.to_markdown(os.path.join(database_folder, "success.md"))


## Closing the Database Connection

In [None]:
my_conn.close()