In [3]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
import os
import re
from urllib.parse import urljoin
import time
import string
from dotenv import load_dotenv, find_dotenv

In [11]:
load_dotenv(find_dotenv())

True

In [12]:
KAFKA_BOOTSTRAP_SERVER = os.getenv("KAFKA_BOOTSTRAP_SERVER")

KAFKA_REST_ENDPOINT = os.getenv("KAFKA_REST_ENDPOINT")

KAFKA_TOPIC = os.getenv("KAFKA_TOPIC")

DIRECTORY_TO_WATCH = os.getenv("WATCH_DIRECTORY")

SERVICE_ACCOUNT_KEY = os.getenv("GCS_KEY_PATH")

GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")

GCS_BUCKET_NAME = os.getenv("GCS_BUCKET_NAME")

In [7]:
# Specify the Metadata File Path
file_path = './data/moa_metadata.csv'

# Read the CSV file
moa_metadata = pd.read_csv(file_path)

# Extract the MOA_URL links into a list
moa_url_list = moa_metadata['Page_URL'].tolist()

In [8]:
moa_url_list

['https://www.sanjoseca.gov/your-government/departments-offices/office-of-the-city-manager/employee-relations/labor-relations-information/bargaining-units-labor-contract-info/abmei',
 'https://www.sanjoseca.gov/your-government/departments-offices/office-of-the-city-manager/employee-relations/labor-relations-information/bargaining-units-labor-contract-info/aea',
 'https://www.sanjoseca.gov/your-government/departments-offices/office-of-the-city-manager/employee-relations/labor-relations-information/bargaining-units-labor-contract-info/alp',
 'https://www.sanjoseca.gov/your-government/departments-offices/office-of-the-city-manager/employee-relations/labor-relations-information/bargaining-units-labor-contract-info/amsp',
 'https://www.sanjoseca.gov/your-government/departments-offices/office-of-the-city-manager/employee-relations/labor-relations-information/bargaining-units-labor-contract-info/camp',
 'https://www.sanjoseca.gov/your-government/departments-offices/office-of-the-city-manager/

In [52]:
# Function to sanitize filenames
def sanitize_filename(filename):
    valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
    return ''.join(c for c in filename if c in valid_chars)

In [53]:
# Function to download PDF from a given URL
def download_pdf(url, base_url, save_path, session):
    url = urljoin(base_url, url)
    filename = sanitize_filename(save_path) + '.pdf'
    full_save_path = os.path.join(pdf_directory, filename)

    response = session.get(url, stream=True)
    if response.status_code == 200:
        with open(full_save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"PDF downloaded: {full_save_path}")
    else:
        print(f"Failed to download PDF from {url} - Status Code: {response.status_code}")

In [54]:
# Directory to save PDFs
pdf_directory = './data/'
if not os.path.exists(pdf_directory):
    os.makedirs(pdf_directory)

# Base URL
base_url = 'https://www.sanjoseca.gov'

In [55]:
# Start a session for requests
session = requests.Session()
session.headers.update({
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
})

# Regex pattern for matching 'Union Contract' in a case-insensitive manner
pattern = re.compile('union contract', re.IGNORECASE)

# Iterate over each MOA_URL
for moa_url in moa_url_list:
    try:
        response = session.get(moa_url)
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')

            # Find all potential headings or paragraphs
            potential_headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p'], string=pattern)

            for heading in potential_headings:
                next_siblings = heading.find_next_siblings(['p', 'ul'])
                for sibling in next_siblings:
                    if sibling.name == 'ul':
                        for li in sibling.find_all('li'):
                            a_tag = li.find('a')
                            if a_tag and a_tag.get('href'):
                                pdf_url = a_tag.get('href')
                                pdf_text = a_tag.get_text(strip=True)
                                download_pdf(pdf_url, base_url, pdf_text, session)
                        break  # Break after processing each <ul> associated with the heading

            # Sleep between requests
            time.sleep(1)
        else:
            print(f"Failed to get {moa_url} - Status Code: {response.status_code}")
            time.sleep(5)
    except Exception as e:
        print(f"Error processing {moa_url}: {e}")
        time.sleep(10)

PDF downloaded: ./data/Association of Building Mechanical and Electrical Inspectors (ABMEI) MOA.pdf
PDF downloaded: ./data/Association of Building Mechanical and Electrical Inspectors (ABMEI) MOA.pdf
PDF downloaded: ./data/Association of Engineers and Architects IFTPE Local 21 Units 4142 MOA.pdf
PDF downloaded: ./data/Association of Engineers and Architects IFPTE Local 21 Unit 43 MOA.pdf
PDF downloaded: ./data/Association of Legal Professionals of San Jose (ALP).pdf
PDF downloaded: ./data/Association of Legal Professionals of San Jose (ALP).pdf
PDF downloaded: ./data/Association of Maintenance Supervisory Personnel IFPTE Local 21 (AMSP) MOA.pdf
PDF downloaded: ./data/City Association of Management Personnel IFPTE Local 21 (CAMP) MOA.pdf
PDF downloaded: ./data/San Jos Fire Fighters - International Association of Firefighters (IAFF) Local 230 MOA.pdf
PDF downloaded: ./data/San Jos Fire Fighters - International Association of Firefighters (IAFF) Local 230 MOA.pdf
PDF downloaded: ./data/In

In [13]:
!pip install watchdog confluent-kafka



In [1]:
!pip install google-cloud-storage

Collecting google-cloud-storage
  Downloading google_cloud_storage-2.13.0-py2.py3-none-any.whl.metadata (6.1 kB)
Collecting google-auth<3.0dev,>=2.23.3 (from google-cloud-storage)
  Downloading google_auth-2.23.4-py2.py3-none-any.whl.metadata (4.7 kB)
Collecting google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5 (from google-cloud-storage)
  Downloading google_api_core-2.14.0-py3-none-any.whl.metadata (2.6 kB)
Collecting google-cloud-core<3.0dev,>=2.3.0 (from google-cloud-storage)
  Downloading google_cloud_core-2.3.3-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting google-resumable-media>=2.6.0 (from google-cloud-storage)
  Downloading google_resumable_media-2.6.0-py2.py3-none-any.whl.metadata (2.1 kB)
Collecting google-crc32c<2.0dev,>=1.0 (from google-cloud-storage)
  Downloading google_crc32c-1.5.0-cp311-cp311-win_amd64.whl (27 kB)
Collecting googleapis-common-protos<2.0.dev0,>=1.56.2 (from google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5->google-clo

In [15]:
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from google.cloud import storage
from google.oauth2 import service_account

class Watcher:
    DIRECTORY_TO_WATCH = DIRECTORY_TO_WATCH

    def __init__(self):
        self.observer = Observer()
        self.running = True

    def run(self):
        event_handler = Handler()
        self.observer.schedule(event_handler, self.DIRECTORY_TO_WATCH, recursive=True)
        self.observer.start()

        try:
            while self.running:
                time.sleep(1)
        except KeyboardInterrupt:
            self.stop()
        except Exception as e:
            print(f"An error occurred: {e}")
            self.stop()

    def stop(self):
        self.observer.stop()
        self.observer.join()
        self.running = False
        print("Watcher stopped")

class Handler(FileSystemEventHandler):
    @staticmethod
    def on_any_event(event):
        if event.is_directory or not event.event_type == 'created':
            return None

        if event.src_path.endswith('.pdf'):
            print(f"New PDF detected: {event.src_path}")
            upload_to_gcs(event.src_path)

def upload_to_gcs(file_path):
    # Configure GCS client
    credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_KEY)
    client = storage.Client(credentials=credentials, project=GCP_PROJECT_ID)

    bucket_name = GCS_BUCKET_NAME
    bucket = client.bucket(bucket_name)

    # Set the name of the file in the bucket
    blob = bucket.blob(file_path.split('\\')[-1])

    try:
        blob.upload_from_filename(file_path)
        print(f'File {file_path} uploaded to {bucket_name}.')
    except Exception as e:
        print(f'Failed to upload {file_path} to GCS: {e}')

if __name__ == '__main__':
    w = Watcher()
    w.run()

New PDF detected: C:/Users/koush/Synthia_Anaconda/src/synthia/notebooks/data\Article 39 - Exempt Officers and Sergeants Modified Duty Program.pdf
File C:/Users/koush/Synthia_Anaconda/src/synthia/notebooks/data\Article 39 - Exempt Officers and Sergeants Modified Duty Program.pdf uploaded to synthia_source_data.
New PDF detected: C:/Users/koush/Synthia_Anaconda/src/synthia/notebooks/data\Association of Building Mechanical and Electrical Inspectors (ABMEI) MOA.pdf
File C:/Users/koush/Synthia_Anaconda/src/synthia/notebooks/data\Association of Building Mechanical and Electrical Inspectors (ABMEI) MOA.pdf uploaded to synthia_source_data.
New PDF detected: C:/Users/koush/Synthia_Anaconda/src/synthia/notebooks/data\Association of Engineers and Architects IFPTE Local 21 Unit 43 MOA.pdf
File C:/Users/koush/Synthia_Anaconda/src/synthia/notebooks/data\Association of Engineers and Architects IFPTE Local 21 Unit 43 MOA.pdf uploaded to synthia_source_data.
New PDF detected: C:/Users/koush/Synthia_Ana