Skip to content

Big-Data-Project of 6870655 Sem 5 DHBW IT-Automotive 2021

Notifications You must be signed in to change notification settings

inf19150/BigData

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

31 Commits
 
 
 
 
 
 
 
 

Repository files navigation

BigData

Requirements / Prerequisites

Run Application

  • Clone this repo: git clone https://github.com/inf19150/BigData && cd BigData/docker
  • Run docker-compose up -d for production and docker-compose up -d -f docker-compose.dev.yml for dev
  • Start hadoop, therefore run docker-compose up -d
  • wait until docker logs hadoop says Container Startup finished.
  • Run docker exec -it bash hadoop, within bash of container execute
  • su hadoop && cd
  • start-all.sh
  • hiveserver2

Visit UI of components

Service URL
NodeRed-Editor your-host-ip:1880
NodeRed-UI (Frontend) your-host-ip:1880/ui
AirFlow your-host-ip:8080
HDFS your-host-ip:9870

Various credentials

Service User PW
NodeRed admin bigdata2021
MySQL root bigdata2021
MySQL sqluser password

DAGs & Tasks

CellCoverage initial

CellCoverage daily

Tasks of initial DAG
# Create a local direoctory within airflow, where the raw-data gets downloaded to
create_local_import_dir = BashOperator(
    task_id='create_import_dir',
    bash_command='mkdir -p /home/airflow/ocid/raw',
    dag=initial_dag,
)


# Create a corresponding directory, where the extracted raw-data (csv-file) is later being uploaded to
create_remote_hdfs_dir_raw = HdfsMkdirFileOperator(
    task_id='mkdir_hdfs_ocid_raw_dir',
    directory='/user/hadoop/ocid/work/',
    hdfs_conn_id='hdfs',
    dag=initial_dag,
)


# Create a corresponding directory, where the reduced final-data (as table) is later being stored to
create_remote_hdfs_dir_final = HdfsMkdirFileOperator(
    task_id='mkdir_hdfs_ocid_final_dir',
    directory='/user/hadoop/ocid/final/',
    hdfs_conn_id='hdfs',
    dag=initial_dag,
)


# Download full database from ocid to local file on airflow fs
download_initial_dataset = HttpDownloadOperator(
    task_id='download_initial',
    # download_uri='https://opencellid.org/ocid/downloads?token={}&type=full&file=cell_towers.csv.gz'.format(API_KEY),
    download_uri='http://193.196.53.117/ocid/cell_towers.csv.gz',
    save_to='/home/airflow/ocid/raw/ocid_full_{{ ds }}.csv.gz',
    dag=initial_dag,
)


# Unzip full database tgz-file to csv file on airflow fs
unzip_initial_dataset = UnzipFileOperator(
    task_id='unzip_initial',
    zip_file='/home/airflow/ocid/raw/ocid_full_{{ ds }}.csv.gz',
    extract_to='/home/airflow/ocid/raw/ocid_full_{{ ds }}.csv',
    dag=initial_dag,
)


# Move extracted full database to remote hdfs
hdfs_put_ocid_initial = HdfsPutFileOperator(
    task_id='upload_ocid_full_hdfs',
    local_file='/home/airflow/ocid/raw/ocid_full_{{ ds }}.csv',
    remote_file='/user/hadoop/ocid/work/ocid_full_{{ ds }}.csv',
    hdfs_conn_id='hdfs',
    dag=initial_dag,
)


pyspark_ocid_full_to_final = SparkSubmitOperator(
    task_id='pyspark_filter_reduce_full_write_to_final_parquet',
    conn_id='spark',
    application='/home/airflow/airflow/python/ocid_full_to_final_db.py',
    total_executor_cores='2',
    executor_cores='2',
    executor_memory='2g',
    num_executors='2',
    name='spark_raw_to_final_full',
    verbose=True,
    application_args=[
        '--year', '{{ macros.ds_format(ds, "%Y-%m-%d", "%Y")}}',
        '--month', '{{ macros.ds_format(ds, "%Y-%m-%d", "%m")}}',
        '--day',  '{{ macros.ds_format(ds, "%Y-%m-%d", "%d")}}',
        '--hdfs_source_dir', '/user/hadoop/ocid/work/',
        '--hdfs_target_dir', '/user/hadoop/ocid/final/',
    ],
    dag=initial_dag
)
Tasks of daily DAG
# Download diff database from ocid to local file on airflow fs
download_diff_dataset = HttpDownloadOperator(
    task_id='download_diff',
    #download_uri='https://opencellid.org/ocid/downloads?token={}&type=diff&file=OCID-diff-cell-export-{{ ds }}-T000000.csv.gz'.format(API_KEY),
    download_uri='http://193.196.53.117/ocid/OCID-diff-cell-export-{{ ds }}-T000000.csv.gz',
    save_to='/home/airflow/ocid/raw/ocid_diff_{{ ds }}.csv.gz',
    dag=daily_dag,
)


# Unzip diff database tgz-file to csv file on airflow fs
unzip_diff_dataset = UnzipFileOperator(
    task_id='unzip_diff',
    zip_file='/home/airflow/ocid/raw/ocid_diff_{{ ds }}.csv.gz',
    extract_to='/home/airflow/ocid/raw/ocid_diff_{{ ds }}.csv',
    dag=daily_dag,
)

# Clear local files within raw-directory on airflow fs
clear_local_raw_import_dir = ClearDirectoryOperator(
    task_id='clear_local_raw_directory',
    directory='/home/airflow/ocid/raw',
    pattern='*',
    dag=daily_dag,
)

# Move diff database to remote hdfs
hdfs_put_ocid_diff = HdfsPutFileOperator(
    task_id='upload_ocid_diff_hdfs',
    local_file='/home/airflow/ocid/raw/ocid_diff_{{ ds }}.csv',
    remote_file='/user/hadoop/ocid/work/ocid_diff_{{ ds }}.csv',
    hdfs_conn_id='hdfs',
    dag=daily_dag,
)

pyspark_ocid_diff_to_final = SparkSubmitOperator(
    task_id='pyspark_filter_reduce_diff_write_to_final_parquet',
    conn_id='spark',
    application='/home/airflow/airflow/python/ocid_diff_to_final_db.py',
    total_executor_cores='2',
    executor_cores='2',
    executor_memory='2g',
    num_executors='2',
    name='spark_raw_to_final_diff',
    verbose=True,
    application_args=[
        '--year', '{{ macros.ds_format(ds, "%Y-%m-%d", "%Y")}}',
        '--month', '{{ macros.ds_format(ds, "%Y-%m-%d", "%m")}}',
        '--day',  '{{ macros.ds_format(ds, "%Y-%m-%d", "%d")}}',
        '--hdfs_source_dir', '/user/hadoop/ocid/work/',
        '--hdfs_target_dir', '/user/hadoop/ocid/final/',
    ],
    dag=daily_dag
)

Explanation of Cell_Coverage_initial DAG

This Dag shall run only once, fetching the complete dataset (zip-file), extracting it, uploading to hdfs, reducing, parition and filter and finally append initially to local and remote database.

This DAG consists of the following tasks:

ID Description
create_local_import_dir Create a local direoctory within airflow, where the raw-data gets downloaded to
create_remote_hdfs_dir_raw Create a corresponding directory, where the extracted raw-data (csv-file) is later being uploaded to
create_remote_hdfs_dir_final Create a corresponding directory, where the reduced final-data (as table) is later being stored to
download_initial_dataset Download full database from ocid to local file on airflow fs
unzip_initial_dataset Unzip full database tgz-file to csv file on airflow fs
hdfs_put_ocid_initial Move extracted full database to remote hdfs
pyspark_ocid_full_to_final Filter only germany, based on coordinates, partition by radio-type (GSM, UMTS, LTE) and move to final table (hdfs-parquet) as well as external mysql-db

Explanation of Cell_Coverage_daily DAG

This Dag shall run daily, fetching only the current diff (zip-file), extracting it, uploading to hdfs, reducing, parition and filter and finally append to local and remote database.

This DAG consists of the following tasks:

ID Description
download_diff_dataset Download diff database from ocid to local file on airflow fs
unzip_diff_dataset Unzip diff database tgz-file to csv file on airflow fs
clear_local_raw_import_dir Clear local files within raw-directory on airflow fs
hdfs_put_ocid_diff Move diff database to remote hdfs
pyspark_ocid_diff_to_final Filter only germany, based on coordinates, partition by radio-type (GSM, UMTS, LTE) and append to final table (hdfs-parquet) as well as external mysql-db

Software-Design

Docker-Stack

Frontend

Code/Flow of Frontend business-logic, see docker/nodered/

OpenCellID Crawler-Script

Due to limited api-requests a simple bash-script has been written and is executed every night by a cronjob.

DAY_STR=$(date +"%Y-%m-%d")
echo $DAY_STR

$(rm -r "/home/ubuntu/httpdata/ocid/*")
$(wget -O "/home/ubuntu/httpdata/ocid/OCID-diff-cell-export-"$DAY_STR"-T000000.csv.gz" "https://opencellid.org/ocid/downloads?token=pk.1d747aefca776719299e26f04d7d331c&type=diff&file=OCID-diff-cell-export-"$DAY_STR"-T000000.csv.gz")
$(wget -O "/home/ubuntu/httpdata/ocid/cell_towers.csv.gz" "https://opencellid.org/ocid/downloads?token=pk.1d747aefca776719299e26f04d7d331c&type=full&file=cell_towers.csv.gz")

echo "Done!"

A simple nginx-webserver is used to create a own http-endpoint which can be used by the corresponding DAG-Task.

Appendix

OCID-API Structure

Screenshots

Partitioned and reduced table stored as parquet on HDFS

Working dir on HDFS, where extracted raw-data is stored to and being processed further

Count of reduced datasets on final mysql-database, after first initial dag

Docker-Compose Stack