Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web Scrapper Assignment #124

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,4 @@ dmypy.json

# Pyre type checker
.pyre/
.idea
10 changes: 0 additions & 10 deletions .vscode/settings.json

This file was deleted.

124 changes: 124 additions & 0 deletions e_tenders_india/deploy/prod/dag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: eti-cron-wf
namespace: %ENV%
spec:
serviceAccountName: workflow
schedule: "30 7 * * *"
concurrencyPolicy: "Replace"
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 10
suspend: True
workflowSpec:
serviceAccountName: workflow
ttlStrategy:
secondsAfterCompletion: 1800 # Time to live after workflow is completed, replaces ttlSecondsAfterFinished
secondsAfterSuccess: 1800
entrypoint: diamond
onExit: exit-handler
templates:
- name: exit-handler
steps:
- - name: success
template: success
when: "{{workflow.status}} == Succeeded"
- name: faliure
template: faliure
when: "{{workflow.status}} != Succeeded"
- name: success
container:
env:
- name: SLACK_WEBHOOK
valueFrom:
secretKeyRef:
key: url
name: slack-webhook
image: 965994533236.dkr.ecr.eu-west-3.amazonaws.com/slack_bot_dags:v0.0.1
command: ["python3"]
args: [
"main.py",
"{{workflow.name}}",
"{{workflow.namespace}}", "{{workflow.scheduledTime}}",
"{{workflow.status}}", "{{workflow.duration}}",
]
- name: faliure
container:
env:
- name: SLACK_WEBHOOK
valueFrom:
secretKeyRef:
key: url
name: slack-webhook
image: 965994533236.dkr.ecr.eu-west-3.amazonaws.com/slack_bot_dags:v0.0.1
command: ["python3"]
args: [
"main.py",
"{{workflow.name}}",
"{{workflow.namespace}}", "{{workflow.scheduledTime}}",
"{{workflow.status}}", "{{workflow.duration}}",
"{{workflow.failures}}"
]
- name: eti
inputs:
parameters:
- name: step
container:
requests:
cpu: 4000m
memory: 16Gi
image: 965994533236.dkr.ecr.eu-west-3.amazonaws.com/projects_e_tenders_india/ingestion:%TAG%
command: ["python3", "client.py", "--step", "{{inputs.parameters.step}}"]
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-key
key: username
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-key
key: password
- name: ELASTICSEARCH_URL
valueFrom:
secretKeyRef:
name: elastic-key
key: url
- name: ELASTICSEARCH_USER
valueFrom:
secretKeyRef:
name: elastic-key
key: username
- name: ELASTICSEARCH_PASS
valueFrom:
secretKeyRef:
name: elastic-key
key: password
- name: diamond
dag:
tasks:
- name: MainScraper
template: eti
arguments:
parameters: [{name: step, value: 1}]
- name: Cleaner
dependencies: [MainScraper]
template: eti
arguments:
parameters: [{name: step, value: 2}]
- name: Geocoder
dependencies: [Cleaner]
template: eti
arguments:
parameters: [{name: step, value: 3}]
- name: Standardiser
dependencies: [Geocoder]
template: eti
arguments:
parameters: [{name: step, value: 4}]
- name: ElasticSearchIngestion
dependencies: [Standardiser]
template: eti
arguments:
parameters: [{name: step, value: 5}]
1 change: 1 addition & 0 deletions e_tenders_india/src/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
**/*.pyc
7 changes: 7 additions & 0 deletions e_tenders_india/src/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM python:3.8-slim-buster

COPY . .

RUN pip3 install --default-timeout=1000 -r requirements.txt

ENTRYPOINT ["python3"]
63 changes: 63 additions & 0 deletions e_tenders_india/src/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os
import dotenv
import logging

from datetime import datetime

from dependencies.utils import load_config_yaml # type: ignore
from dependencies.scraping.eTendersIndiaMetadataScraper import eTendersIndiaMetadataScraper # type: ignore
from dependencies.cleaning.eTendersIndia import clean_eTendersIndia # type: ignore
from dependencies.geocoding.etendersindia_geocode import eTendersIndiaGeocoder # type: ignore
from dependencies.standardisation.eTendersIndiaStandardiser import eTendersIndiaStandardiser # type: ignore
from dependencies.es_ingestion.etenders_ingestion import eTendersIndiaIngestion

dotenv.load_dotenv(".env")
logging.basicConfig(level=logging.INFO)


def step_1():
path_config = load_config_yaml()["paths"]["ETENDERSINDIA"]
eTendersIndiaMetadataScraper(config=path_config).run()
logging.info("Scraped Metadata and Data")


def step_2():
clean_eTendersIndia().run()
logging.info("Cleaned Main Data")


def step_3():
path_config = load_config_yaml()["paths"]["ETENDERSINDIA"]
APIKEY = os.getenv("POSITION_STACK_API_KEY")
eTendersIndiaGeocoder(config=path_config, api_key=APIKEY).run()
logging.info("Geocoded Cleaned Data")


def step_4():
path_config = load_config_yaml()["paths"]["ETENDERSINDIA"]
path_config["columns"] = ["tender_category", "sub_category", "product_category"]
eTendersIndiaStandardiser(config=path_config).run()
logging.info("Standardised Cleaned Data")


def step_5():
path_config = load_config_yaml()["paths"]["ETENDERSINDIA"]
eTendersIndiaIngestion(config=path_config).run()
logging.info("Ingested into ElasticSearch")

if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--step", help="step to be choosen for execution")

args = parser.parse_args()

eval(f"step_{args.step}()")

logging.info(
{
"last_executed": str(datetime.now()),
"status": "Pipeline executed successfully",
}
)
138 changes: 138 additions & 0 deletions e_tenders_india/src/dependencies/cleaning/BaseCleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import traceback

from ..utils import load_config_yaml
from ..utils.bucket import (
connect_to_buffer_bucket,
push_csv_to_buffer_bucket,
read_csv_from_buffer_bucket,
read_excel_from_buffer_bucket,
read_stata_from_buffer_bucket,
)


class BaseCleaner:
"""
This is an abstract cleaner class.
"""

def __init__(self):
self.off2short_dict = {}
self.country_name2code_dict = {}
self.country_code2region_name_dict = {}
self.region_name2code_dict = {}
self.column_rename_dict = {}

self.base_data = None
self.raw_data = None

self.bucket = connect_to_buffer_bucket()
self.config = load_config_yaml()
self.load_conversion_dictionaries(self.config["location_mapping_paths"])

def load_data(self, paths):
try:
if paths["base_data_path"] != "None":
# self.base_data = pd.read_csv(paths["base_data_path"])
self.base_data = read_csv_from_buffer_bucket(
self.bucket, paths["base_data_path"]
)
except Exception as e:
print(f"Error reading base data\n{e}\n")

try:
ext = paths["master_data_path"].split(".")[-1]
if ext == "csv":
# self.raw_data = pd.read_csv(paths["raw_data_path"])
self.raw_data = read_csv_from_buffer_bucket(
self.bucket, paths["master_data_path"]
)
elif ext == "xlsx":
# self.raw_data = pd.read_excel(paths["raw_data_path"])
self.raw_data = read_excel_from_buffer_bucket(
self.bucket, paths["master_data_path"]
)
elif ext == "dta":
# self.raw_data = pd.read_stata(paths["raw_data_path"])
# self.column_rename_dict = pd.read_stata(paths["raw_data_path"],iterator=True).variable_labels()
self.raw_data = read_stata_from_buffer_bucket(
self.bucket, paths["master_data_path"]
)
self.column_rename_dict = read_stata_from_buffer_bucket(
self.bucket, paths["master_data_path"], iterator=True
).variable_labels()
except Exception as e:
print(f"Error reading file from given path\n{e}\n")

def load_conversion_dictionaries(self, paths):
try:
# off2short = pd.read_excel(paths["official_name2short_name"])
off2short = read_excel_from_buffer_bucket(
self.bucket, sheet_name="Sheet1", rel_path=paths["official_names"]
)
self.off2short_dict = dict(
off2short[["Official Name", "Short Name"]].values.tolist()
)
except Exception as e:
print("Error loading dict:", e, traceback.print_exc())

try:
# country_name2code = pd.read_excel(paths["country_region_incomegrp"],sheet_name='Country_income_grp')
country_name2code = read_excel_from_buffer_bucket(
self.bucket,
sheet_name="Country_income_grp",
rel_path=paths["region_and_income_group"],
)
self.country_name2code_dict = dict(
country_name2code[["Country Name", "Code"]].values.tolist()
)
except Exception as e:
print("Error loading dict:", e, traceback.print_exc())

try:
# country_code2region_name = pd.read_excel(paths["country_region_incomegrp"],
# sheet_name='Country_income_grp')
country_code2region_name = read_excel_from_buffer_bucket(
self.bucket,
sheet_name="Country_income_grp",
rel_path=paths["region_and_income_group"],
)
self.country_code2region_name_dict = dict(
country_code2region_name[["Code", "Region"]].values.tolist()
)
except Exception as e:
print("Error loading dict:", e, traceback.print_exc())

try:
# region_name2code = pd.read_excel(paths["country_region_incomegrp"],sheet_name='Region_codes')
region_name2code = read_excel_from_buffer_bucket(
self.bucket,
sheet_name="Region_codes",
rel_path=paths["region_and_income_group"],
)
self.region_name2code_dict = dict(
region_name2code[["Region Name", "Region code"]].values.tolist()
)
except Exception as e:
print("Error loading dict:", e, traceback.print_exc())

def clean(self):
"""
This is an unimplemented method where dataset can be cleaned using
Pandas Library function
"""
return None

def run(self):
"""
Unimplemented method - to execute the class
"""
return None

def save_data(self, cleaned_data, save_2_path):
try:
# if not os.path.exists(save_2_path.split('/')[0]):
# os.makedirs(save_2_path.split('/')[0])
push_csv_to_buffer_bucket(self.bucket, cleaned_data, save_2_path)
# cleaned_data.to_csv(save_2_path, header=True, index=False)
except Exception as e:
print(f"Error saving data\n{e}\n", traceback.print_exc())
Empty file.
Loading