The repo contains a set of Python and R utilities for pushing Spark RDD's to MariaDB distributed databases. The need for such utilities arises from the fact that table schemas in MariaDB are not propagated automatically from the frontend instance to the backend instances. The utilities in this repo implement the propagation.
Clone/download the repo and source the R-scripts from your script. It is also possible to source them directly from the repo using:
myURL <- c(
"https://raw.githubusercontent.com/goshevs/sparkDBUtilities/master/R/sparkArgsParser.R",
"https://raw.githubusercontent.com/goshevs/sparkDBUtilities/master/R/sparkToDistMDB.R")
eval(parse(text = getURL(myURL[1], ssl.verifypeer = FALSE)))
eval(parse(text = getURL(myURL[2], ssl.verifypeer = FALSE)))
Clone/download the repo and import the modules in your script. It may
be necessary to update the PYTHONPATH
environment variable with the
location of the scripts to make them discoverable by Python.
All R and Python functions have identical syntax which should facilitate transitions between the software packages.
This script/module contains two primary functions:
-
pushAdminToMDB
: sets up connections, access credentials and rights among the frontend and backend MariaDB db instances. The function is only useful/needed if pushing to a distributed db. -
pushSchemaToMDB
: pushes the schema of an RDD to the db. The function is designed to work with distributed db's but can also be used with non-distributed instances of MariaDB. The advantage of using it in the latter case is that it sets up an auto-increment fieldid
which can be used as a partitioning variable for reading a db table into Spark.
The script/module also contains four utility functions:
-
getSchema
: retrieves the RDD schema. -
partitionByListColumn
: a string writer corresponding to partitioning by LIST COLUMNS. At this time, partitioning_expression can only be a variable name, i.e. expressions are not supported. -
partitionByHash
: a string writer corresponding to partitioning by HASH(see Use case 2). -
partitionByRangeColumn
: a string writer corresponding to partitioning by RANGE COLUMNS.
pushSchemaToMDB
accepts user-supplied partitioning strings and
therefore users are not confined to these three types of
partitioning logic.
This script/module contains function parseArguments
which parses
the arguments passed to $MY_SPARK_JOBSCRIPT
as defined in the
SparkHPC setup scripts.
In R, parseArguments
outputs a list, while in Python it outputs a
dictionary, with the following keys:
dataSet
: /path/to/dataset/filedbName
: name of the database to write todbNode
: master node of the databasedbPort
: port of the frontend db nodedbUser
: user name for logging in to the frontend db nodedbPass
: password for logging in to the frontend db nodedbUrl
: jdbc database connection stringdbNodes
: list of all nodes on the db clusterdbBEUser
: user name for logging in to the backend db serversdbBEPass
: password fordbBEUser
dbBENodes
: list of db backend nodes
Utility pushAdminToMDB
has the following syntax:
pushAdminToMDB(dbNodes, dbBENodes, dbPort,
dbUser, dbPass, dbName,
groupSuffix, debug)
Where:
dbNodes
: list of all nodes on the db clusterdbBENodes
: list of db backend nodesdbPort
: port of the backend nodes (default: 3306)dbUser
: user name for logging in to the backend nodesdbPass
: password for logging in to the backend nodesdbName
: name of the database to write togroupSuffix
: the tag in.my.cnf
file to refer for login information when logging to the dbdebug
: if TRUE/True, prints out all commands instead of executing them (default: FALSE/False)
Utility pushSchemaToMDB
has the following syntax:
pushSchemaToMDB(dbNodes, dbName, dbTableName, tableSchema,
partColumn, partitionString, groupSuffix,
debug)
Where:
dbNodes
: list of all nodes on the db clusterdbName
: name of the database to write todbTableName
: name of the table to createtableSchema
: the schema of the RDD to be written to the dbpartColumn
: list of columns for partitioningpartitionString
: string with db partitioning commandsgroupSuffix
: the tag in.my.cnf
file to refer to for db login informationdebug
: if TRUE/True, prints out all commands instead of executing them (default: FALSE/False)
If pushing to a non-distributed database instance or to the frontend of a distributed database instance:
dbNodes
would be the string of the name of the node to push to- arguments
partColumn
,partitionString
, andchangeType
should be omitted.
Utility getSchema
has the following syntax:
getSchema(RDD, key, changeType)
Where:
RDD
: a Spark RDDkey
: the key that matches Spark SQL column types to MariaDB column types. Currently, DecimalType is not supported. A default key is provided by functionsmakeJdbcKey
which are included in the respective R or PythonsparkToDistMDB
file.changeType
: optional list (in R) or dictionary (in Python) containing key-value pairs of column name and column type with the changes to Spark-created/inferred schema the user wishes to implement
Utility pushAdminToMDB
has the following syntax:
partitionByListColumn(partitionRules, beNodes, tableSchema, defaultAdd)
Where:
partitionRules
: list (in R) or dictionary (in Python) with partitioning rules. The admissible structures/containers are:- R: a list of named lists (where names are RDD column names)
- Python: a default dictionary where every value is a list
beNodes
: list of db backend nodestableSchema
: the schema of the RDD to be written to the dbdefaultAdd
: add theDEFAULT
partitioning provision to the partitioning rules (default: TRUE/True). This feature is supported on MariaDB 10.2 and higher
Utility partitionByHash
has the following syntax:
partitionByHash(partColumn, beNodes)
Where:
partColumn
: an RDD column namebeNodes
: list of db backend nodes
Utility partitionByRangeColumn
has the following syntax:
partitionByRangeColumn(partitionRules, beNodes, tableSchema,
maxValAdd, sortVal)
Where:
partitionRules
: list (in R) or dictionary (in Python) with partitioning rules. The admissible structures/containers are:- R: a named list of vectors (where names are RDD column names)
- Python: a default dictionary where every value is a list
beNodes
: list of db backend nodestableSchema
: the schema of the RDD to be written to the dbmaxValAdd
: addmaxvalue
clauses to the partitioning rules (default: TRUE/True)sortVal
: Sorts the partitioning values in an increasing order (default: TRUE/True). Applicable if partitioning by a single column.
Users should consider the following constraints when using this software:
- The number of partitions must equal the number of backend db instances.
- As mentioned above, columns of DecimalType are not supported at this time.
partitionByListColumn
accepts columns names only as expressions are not supported at this time.
Please, see directory examples
for a use case in Python and R.