The Analytical Dataset Generation (ADG) cluster converts the latest versions of all records in specified HBase tables into Parquet files stored on S3. It then generates Hive tables to provide downstream data processing & analytics tasks with convenient SQL access to that data.
- At a defined time, a CloudWatch event will trigger the
EMR Launcher
Lambda function - The
EMR Launcher
reads EMR Cluster configuration files from theConfig
S3 bucket, then calls theRunJobFlow
API of the EMR service which results in anAnalytical Dataset Generator
(ADG
) EMR cluster being launched - The
ADG Cluster
is configured as a read-replica of theIngest HBase
EMR cluster; a PySpark step run on the cluster reads HBase Storefiles from theInput
S3 bucket and produces Parquet files in theOutput
S3 bucket. - The PySpark step then creates external Hive tables over those S3 objects, storing the table definitions in a Glue database
- Once processing is complete, the
ADG Cluster
terminates.
As our logs go to cloudwatch, we can use AWS Insights to gather data and metrics about the ADG runs.
Make sure you set the time range or you'll get odd results
You can't. However, you can see previously used insights in the account (useful as you can't save them per se) by going to:
"CloudWatch | Logs | Insights (left menu bar) | Actions (button) | View query history for this account"
If you put a comment as the first line of a query it can do for a proxy title.
# Time taken for each & all colletions
fields @timestamp , @message
| parse @message "{ 'timestamp':* 'log_level':* 'message': 'time taken for*:*'" as timestamp ,log_level, collection, timetaken
| display collection, timetaken
| sort timetaken desc
- Pull the latest python3-pyspark-pytest image by running:
docker pull dwpdigital/python3-pyspark-pytest
- Run:
docker run -it --rm --name adg-docker -v "$(pwd)":/install -w /install dwpdigital/python3-pyspark-pytest pytest .
- make sure to delete locally generated directories metastore_db, spark-temp, spark-warehouse directory if unit tests fail when run locally
To export application and OS metrics, JMX exporter and node exporter were chosen. They integrate with the existing metrics infrastructure and allow for metrics to be scraped by Prometheus. These metrics should then be queryable in Thanos.
-
Add jmx javagent to the pom.xml file that is used to download the jars
<dependency> <groupId>io.prometheus.jmx</groupId> <artifactId>jmx_prometheus_javaagent</artifactId> <version>0.14.0</version> </dependency>
this downloads jmx_javaagent jar from Maven. This has to happen as a bootstrap action as it needs to be present at application setup.
-
A script needs to be configured to access Maven Central where the dependencies mentioned in the pom file above can be downloaded. An example can be found here
-
Create a config file for JMX exporter as explained in the Configuration section of the Prometheus GitHub page. As JMX exporter is ran as a Javaagent no URL or port needs to be specified in this file.
--- lowercaseOutputName: true rules: - pattern: '.*'
This config captures all metrics found by JMX.
-
Edit cluster launch configuration to start applications with Jmx exporter running as a javaagent.
Eg. hadoop-env configuration
- Classification: "hadoop-env" Configurations: - Classification: "export" Properties: "HADOOP_NAMENODE_OPTS": "\"-javaagent:/opt/emr/metrics/dependencies/jmx_prometheus_javaagent-0.14.0.jar=7101:/opt/emr/metrics/prometheus_config.yml\"" "HADOOP_DATANODE_OPTS": "\"-javaagent:/opt/emr/metrics/dependencies/jmx_prometheus_javaagent-0.14.0.jar=7103:/opt/emr/metrics/prometheus_config.yml\""
Javaagent needs to be configured to start on an unused port. If running multiple agents they need to run on different ports. In the hadoop-env example 7101, 7103.
-
Add an ingress/egress security group rule to allow prometheus connectivity over port 9090
-
Add an ingressegress security group rules to allow communication between prometheus and your service on the JMX port.
-
Add a scrape config to Prometheus to discover metrics on the JMX exporter port.
The port defined in the config aligns with the ingress/egress rules and determines where Prometheus looks for metrics. This re-label config replaces instance labels that show up as IP addresses with the value of the EC2 tag Name
relabel_configs: - source_labels: [__meta_ec2_tag_Name] regex: (.*) target_label: instance replacement: $1 action: replace
source_labels
is what it looks for. In this case it is the value of the EC2 tag Name.
regex
is the pattern to match for the source label.
target_label
is the label to replace.
replacement
is the regex group to be replaced with. In this case it is the value of the tag. -
Re-label the instances to differentiate between EMR nodes without mentioning the IP
export AWS_DEFAULT_REGION=${aws_default_region} UUID=$(dbus-uuidgen | cut -c 1-8) TOKEN=$(curl -X PUT -H "X-aws-ec2-metadata-token-ttl-seconds: 21600" "http://169.254.169.254/latest/api/token") export INSTANCE_ID=$(curl -H "X-aws-ec2-metadata-token:$TOKEN" -s http://169.254.169.254/latest/meta-data/instance-id) export INSTANCE_ROLE=$(jq .instanceRole /mnt/var/lib/info/extraInstanceData.json) export HOSTNAME=${name}-$${INSTANCE_ROLE//\"}-$UUID hostname $HOSTNAME aws ec2 create-tags --resources $INSTANCE_ID --tags Key=Name,Value=$HOSTNAME
where
name
is the service name.
Node exporter is used for gathering OS metrics and comes pre-installed with the EMR AMI images. Node exporter runs on port 9100.
To set up, repeat steps 6 to 9 of JMX exporter set-up for the node exporter port.
There is a dynamo-db table that exists which tracks the status of all ADG runs. This table is named data_pipeline_metadata
. In order to populate this table, the emr-setup
bootstrap step kicks off a shell script named update_dynamo.sh
which runs in the background.
This script waits for certain files to exist on the local file systems. These files contain information passed to the cluster from SNS (like the correlation id) and the first step of the cluster than saves them to the files.
When the files are found, then the script updates dynamo db with a row for the current run of the cluster (when the row already exists, it's a retry scenario, see below). Then the script loops in the background for the lifecycle of the cluster. When a step is completed, the current step field is updated in dynamo db and when the cluster is finished the status is updated with the final cluster status. Cancelled clusters are set to failed.
If a cluster fails, then the status is updated in the dynamo db table to failed. When a new cluster starts, before it inserts the new dynamo db row, it checks if one exists. If it does, then it checks the last run step and the status and if the status is failed, it saves off this status to a local file.
Whenever a step starts on a cluster, it calls a common method which checks if this local file exists. If it does not (i.e. this is not a retry scenario) then the step continues a normal. However if the file does exist, then the step checks if the failed cluster was running the same step when it failed. If it was, then it runs the step as normal and the local files is deleted so as not to affect subsequent steps. However if the step name does not match, this step is assumed to have completed before and therefore is skipped this time.
In this way, we are able to retry the entire cluster but not repeat steps that have already succeeded, therefore saving us potentially hours or time for retry scenarios.
There is a concourse pipeline for ADG named analytical-dataset-generation
. The code for this pipeline is in the ci
folder. The main part of the pipeline (the master
group) deploys the infrastructure and runs the e2e tests. There are a number of groups for rotating passwords and there are also admin groups for each environment.
There are a number of available admin jobs for each environment. These can be found in the Concourse Utility pipeline
This job will start an ADG cluster running. In order to make the cluster do what you want it to do, you can alter the following environment variables in the pipeline config and then run aviator
to update the pipeline before kicking it off:
- S3_PREFIX (required) -> the S3 output location for the HTME data to process, i.e.
businessdata/mongo/ucdata/2021-04-01/full
- EXPORT_DATE (required) -> the date the data was exported, i.e
2021-04-01
- CORRELATION_ID (required) -> the correlation id for this run, i.e.
generate_snapshots_preprod_generate_full_snapshots_4_full
- SNAPSHOT_TYPE (required) -> either
full
orincremental
for the type of ADG to start - SKIP_PDM_TRIGGER (optional) -> if not provided, the environment wide terraform default is used to decide whether ADG should trigger PDM when finished - if provided must be either
true
orfalse
to explicity trigger or not trigger PDM (note this only applies to ADG-full clusters, this value is ignored for ADG-incremental)
For stopping clusters, you can run either the stop-full-clusters
job to terminate ALL current ADG-full
clusters on the environment or stop-incremental-clusters
to terminate ALL current ADG-incremental
clusters on the environment.
Sometimes the ADG cluster is required to restart from the beginning instead of restarting from the failure point.
To be able to do a full cluster restart, delete the associated DynamoDB row if it exists. The keys to the row are Correlation_Id
and DataProduct
in the DynamoDB table storing cluster state information (see Retries).
The clear-dynamodb-row
job is responsible for carrying out the row deletion.
To do a full cluster restart
-
Manually enter CORRELATION_ID and DATA_PRODUCT of the row to delete to the
clear-dynamodb-row
job and run aviator.jobs: - name: dev-clear-dynamodb-row plan: - .: (( inject meta.plan.clear-dynamodb-row )) config: params: AWS_ROLE_ARN: arn:aws:iam::((aws_account.development)):role/ci AWS_ACC: ((aws_account.development)) CORRELATION_ID: <Correlation_Id of the row to delete> DATA_PRODUCT: <DataProduct of the row to delete>
-
Run the admin job to
<env>-clear-dynamodb-row
-
You can then run
start-cluster
job with the sameCorrelation_Id
from fresh.
There is an automated AMI upgrade pipeline embedded into the pipeline of this repo (ci/jobs/ami-test
). This is in a serial_group
with the QA deployment pipeline to ensure that they do not interfere with each other.
Please let the tests run and the deployment pipeline will continue automatically.
There is a requirement for our data products to start using Hive 3 instead of Hive 2. Hive 3 comes bundled with EMR 6.2.0 along with other upgrades including Spark. Below is a list of steps taken to upgrade ADG to EMR 6.2.0
-
Make sure you are using an AL2 ami
-
Point ADG at the new metastore:
hive_metastore_v2
ininternal-compute
instead of the old one in the configurations.ymlThe values below should resolve to the new metastore, the details of which are an output of
internal-compute
"javax.jdo.option.ConnectionURL": "jdbc:mysql://${hive_metastore_endpoint}:3306/${hive_metastore_database_name}?createDatabaseIfNotExist=true" "javax.jdo.option.ConnectionUserName": "${hive_metsatore_username}" "javax.jdo.option.ConnectionPassword": "${hive_metastore_pwd}"
-
Create ingress/egress security group rules to the metastore in the
internal-compute
repo. Example belowresource "aws_security_group_rule" "ingress_adg" { description = "Allow mysql traffic to Aurora RDS from ADG" from_port = 3306 protocol = "tcp" security_group_id = aws_security_group.hive_metastore_v2.id to_port = 3306 type = "ingress" source_security_group_id = data.terraform_remote_state.adg.outputs.adg_common_sg.id } resource "aws_security_group_rule" "egress_adg" { description = "Allow mysql traffic to Aurora RDS from ADG" from_port = 3306 protocol = "tcp" security_group_id = data.terraform_remote_state.adg.outputs.adg_common_sg.id to_port = 3306 type = "egress" source_security_group_id = aws_security_group.hive_metastore_v2.id }
-
Rotate the
adg-writer
user from theinternal-compute
pipeline so that when ADG starts up it can login to the metastore. -
Give IAM permissions to the ADG EMR launcher to read the new Secret
data "aws_iam_policy_document" "adg_emr_launcher_getsecrets" { statement { effect = "Allow" actions = [ "secretsmanager:GetSecretValue", ] resources = [ data.terraform_remote_state.internal_compute.outputs.metadata_store_users.adg_writer.secret_arn, ] } }
-
Chang the configurtations.yml change
spark.executor.extraJavaOptions
tospark.executor.defaultJavaOptions
-
Upgrade any dependencies that reference scala 2.11 or Spark 2.4. For ADG the metrics dependency had to be bumped to the new version of scala
<dependency> <groupId>com.banzaicloud</groupId> <artifactId>spark-metrics_2.12</artifactId> <version>2.4-1.0.6</version> </dependency>
-
Bump the EMR version to 6.2.0 and launch the cluster.
Make sure that the first time anything uses the metastore it initialises with Hive 3, otherwise it will have to be rebuilt.
In order to generate status metrics, the emr-setup bootstrap step kicks off a shell script named status_metrics.sh which runs in the background.
This script loops in the background for the lifecycle of the cluster and sends a metric called adg_status
to the ADG pushgateway. This metric has the following
values which map to a certain cluster status
Cluster Status | Metric Value |
---|---|
Running | 1 |
Completed | 2 |
Failed | 3 |
Cancelled | 4 |
ADG-Full has been randomly failing on Production environment with below error
botocore.exceptions.ClientError: An error occurred (RequestTimeTooSkewed) when calling the GetObject operation:
The difference between the request time and the current time is too large.
This error occurs while doing boto3 GetObject operation in the ADG's Spark code. This occurs when there is time difference between local time and the internet time which is used by S3 while making boto3 requests. Amazon S3 uses NTP (Network Time Protocol) to keep its system clocks accurate, NTP provides a standard way of synchronizing computer clocks on servers.
As ADG runs on EMR and read/writes data in S3 there is no local system involved. We have raised following support tickets with Amazon support team
Case ID 8420147331 Case ID 8383474671
Initally AWS support team suggest to raise this with S3 team and then passed on it to EMR team.
On call with AWS support team it was discussed that troubleshooting this issue is difficult as it occurs randomly and EMR cluster is not alive to investigate the error .AWS EMR uses chronyd daemon for Time Sync Service and possibly this error occurs when chronyd service stops. They have requested to provide complete error trace when happens next time which would include S3 request ID made from EMR to S3.
In order to get the complete error trace following steps are suggested and developed as part of DW-6406 ticket.
Export Chrony logs (path = /var/log/chrony) to cloudwatch Whenever we call boto3.get_object set the log level to debug. This will allow us to capture more information such as RequestID which is useful to the AWS support team. Then put the log_level back to original Log the UTC current time whenever we call boto3.get_object. This is to compare this time with the time generated on the S3 side to see if they vary.
Next steps are whenever this error happens check the complete error trace from the logs and contact AWS support team.
In production all data generated by ADG in the published bucket is governed by s3 life cycle policies.
prefix analytical-dataset/full - https://github.ucds.io/dip/aws-common-infrastructure/blob/3ea2f98667df49455be57f35c0ebfd6b79126d15/s3.tf#L287 prefix analytical-dataset/incremental - https://github.ucds.io/dip/aws-common-infrastructure/blob/3ea2f98667df49455be57f35c0ebfd6b79126d15/s3.tf#L297 prefix analytical-dataset/mongo-latest - https://github.ucds.io/dip/aws-common-infrastructure/blob/3ea2f98667df49455be57f35c0ebfd6b79126d15/s3.tf#L307
Prefixes analytical-dataset/hive and analytical-dataset/data
However there is a bucket level policy that purge previous versions every 30 days https://github.ucds.io/dip/aws-common-infrastructure/blob/3ea2f98667df49455be57f35c0ebfd6b79126d15/s3.tf#L277
Going forward if there is any new prefix like analytical-dataset/xxxx to be added explicit life cycle purge policy needs to be defined so that objects get expired appropriately
Strange behaviour was observed when hive join operation is performed between tables created by spark and hive. This include not showing up records that have same value in two tables. During investigation as part of the ticket https://projects.ucd.gpn.gov.uk/browse/DW-7280. We found out that Hive versions are fully compatible with only certain versions of the spark as documented here https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started#HiveonSpark:GettingStarted-VersionCompatibility. As EMR comes up with its own combination of hive and spark versions, this issue is resolved by creating all tables in Hive while other operations/transformations can still be done both Hive and spark.