## Convert MySQL Create Table to PySpark CreateDataFrame

#### Example MySQL Create scripts

In [7]:
mysql_script = """
-- Create syntax for TABLE 'atable'
CREATE TABLE `atable` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `anotherid` int(11) NOT NULL DEFAULT '0',
  `thirdid` int(11) NOT NULL DEFAULT '0',
  `timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `fourthid` int(11) DEFAULT NULL,
  PRIMARY KEY (`anotherid`),
) ENGINE=InnoDB AUTO_INCREMENT=BIGNUM DEFAULT CHARSET=utf8;

-- Create syntax for TABLE 'btable'
CREATE TABLE `btable` (
  `btableid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `id` int(11) NOT NULL DEFAULT '0',
  `fifthid` int(11) NOT NULL DEFAULT '0',
  `type` int(4) NOT NULL DEFAULT '0',
  `day` date DEFAULT NULL,
) ENGINE=InnoDB AUTO_INCREMENT=3170072820 DEFAULT CHARSET=utf8
/*!50100 PARTITION BY HASH (districtid)
PARTITIONS 100 */;
"""
script = [x for x in mysql_script.split("\n") if x !=""]
script

["-- Create syntax for TABLE 'atable'",
 'CREATE TABLE `atable` (',
 '  `id` bigint(20) NOT NULL AUTO_INCREMENT,',
 "  `anotherid` int(11) NOT NULL DEFAULT '0',",
 "  `thirdid` int(11) NOT NULL DEFAULT '0',",
 '  `timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,',
 '  `fourthid` int(11) DEFAULT NULL,',
 '  PRIMARY KEY (`anotherid`),',
 ') ENGINE=InnoDB AUTO_INCREMENT=BIGNUM DEFAULT CHARSET=utf8;',
 "-- Create syntax for TABLE 'btable'",
 'CREATE TABLE `btable` (',
 '  `btableid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,',
 "  `id` int(11) NOT NULL DEFAULT '0',",
 "  `fifthid` int(11) NOT NULL DEFAULT '0',",
 "  `type` int(4) NOT NULL DEFAULT '0',",
 '  `day` date DEFAULT NULL,',
 ') ENGINE=InnoDB AUTO_INCREMENT=3170072820 DEFAULT CHARSET=utf8',
 '/*!50100 PARTITION BY HASH (districtid)',
 'PARTITIONS 100 */;']

### get rid of comment lines

In [8]:
import re
def filter_comments(text, snippet="--"):
    return [x for x in text if re.findall(snippet,x)!=[snippet]]
filter_comments(script,"--")

['CREATE TABLE `atable` (',
 '  `id` bigint(20) NOT NULL AUTO_INCREMENT,',
 "  `anotherid` int(11) NOT NULL DEFAULT '0',",
 "  `thirdid` int(11) NOT NULL DEFAULT '0',",
 '  `timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,',
 '  `fourthid` int(11) DEFAULT NULL,',
 '  PRIMARY KEY (`anotherid`),',
 ') ENGINE=InnoDB AUTO_INCREMENT=BIGNUM DEFAULT CHARSET=utf8;',
 'CREATE TABLE `btable` (',
 '  `btableid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,',
 "  `id` int(11) NOT NULL DEFAULT '0',",
 "  `fifthid` int(11) NOT NULL DEFAULT '0',",
 "  `type` int(4) NOT NULL DEFAULT '0',",
 '  `day` date DEFAULT NULL,',
 ') ENGINE=InnoDB AUTO_INCREMENT=3170072820 DEFAULT CHARSET=utf8',
 '/*!50100 PARTITION BY HASH (districtid)',
 'PARTITIONS 100 */;']

In [9]:
no_comment=filter_comments(script,"--")
no_comment

['CREATE TABLE `atable` (',
 '  `id` bigint(20) NOT NULL AUTO_INCREMENT,',
 "  `anotherid` int(11) NOT NULL DEFAULT '0',",
 "  `thirdid` int(11) NOT NULL DEFAULT '0',",
 '  `timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,',
 '  `fourthid` int(11) DEFAULT NULL,',
 '  PRIMARY KEY (`anotherid`),',
 ') ENGINE=InnoDB AUTO_INCREMENT=BIGNUM DEFAULT CHARSET=utf8;',
 'CREATE TABLE `btable` (',
 '  `btableid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,',
 "  `id` int(11) NOT NULL DEFAULT '0',",
 "  `fifthid` int(11) NOT NULL DEFAULT '0',",
 "  `type` int(4) NOT NULL DEFAULT '0',",
 '  `day` date DEFAULT NULL,',
 ') ENGINE=InnoDB AUTO_INCREMENT=3170072820 DEFAULT CHARSET=utf8',
 '/*!50100 PARTITION BY HASH (districtid)',
 'PARTITIONS 100 */;']

### if any preamble text get rid of it, start with first CREATE

In [10]:
def find_first_create(text,snippet):
    return [i for i,x in enumerate(text) if re.search(snippet, x) is not None][0] 
    
create_tabs = no_comment[find_first_create(no_comment, "CREATE"):]
create_tabs

['CREATE TABLE `atable` (',
 '  `id` bigint(20) NOT NULL AUTO_INCREMENT,',
 "  `anotherid` int(11) NOT NULL DEFAULT '0',",
 "  `thirdid` int(11) NOT NULL DEFAULT '0',",
 '  `timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,',
 '  `fourthid` int(11) DEFAULT NULL,',
 '  PRIMARY KEY (`anotherid`),',
 ') ENGINE=InnoDB AUTO_INCREMENT=BIGNUM DEFAULT CHARSET=utf8;',
 'CREATE TABLE `btable` (',
 '  `btableid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,',
 "  `id` int(11) NOT NULL DEFAULT '0',",
 "  `fifthid` int(11) NOT NULL DEFAULT '0',",
 "  `type` int(4) NOT NULL DEFAULT '0',",
 '  `day` date DEFAULT NULL,',
 ') ENGINE=InnoDB AUTO_INCREMENT=3170072820 DEFAULT CHARSET=utf8',
 '/*!50100 PARTITION BY HASH (districtid)',
 'PARTITIONS 100 */;']

### if code read in with readlines, then list needs to be pasted together, else if already text, just return

In [11]:
def str_type_check(obj):
    if type(obj)==list:
        return "".join(obj)
    else: return obj
    
str_type_check(create_tabs)   # example

"CREATE TABLE `atable` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `anotherid` int(11) NOT NULL DEFAULT '0',  `thirdid` int(11) NOT NULL DEFAULT '0',  `timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `fourthid` int(11) DEFAULT NULL,  PRIMARY KEY (`anotherid`),) ENGINE=InnoDB AUTO_INCREMENT=BIGNUM DEFAULT CHARSET=utf8;CREATE TABLE `btable` (  `btableid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,  `id` int(11) NOT NULL DEFAULT '0',  `fifthid` int(11) NOT NULL DEFAULT '0',  `type` int(4) NOT NULL DEFAULT '0',  `day` date DEFAULT NULL,) ENGINE=InnoDB AUTO_INCREMENT=3170072820 DEFAULT CHARSET=utf8/*!50100 PARTITION BY HASH (districtid)PARTITIONS 100 */;"

### parse, tables/vars return as dict

In [15]:
import re
import pandas as pd
def create_table_to_dict(multi_text):
    multi_text=str_type_check(multi_text)
    sq = re.sub(r"CREATE TABLE `","`" ,multi_text.replace("\n",""))
    sq = [s for s in sq.split(";") if s is not '']
    q_list= [re.sub(',  PRIMARY.*','',q) for q in sq ]
    q_dict = {l[0]:l[1].strip() for l in [q.split(" ( ")  for q in q_list ]}
    return {k:v.split(r",  ") for k,v in q_dict.items()}

create_table_to_dict(create_tabs)   # example



{'`atable`': ['`id` bigint(20) NOT NULL AUTO_INCREMENT',
  "`anotherid` int(11) NOT NULL DEFAULT '0'",
  "`thirdid` int(11) NOT NULL DEFAULT '0'",
  '`timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP',
  '`fourthid` int(11) DEFAULT NULL'],
 '`btable`': ['`btableid` bigint(20) unsigned NOT NULL AUTO_INCREMENT',
  "`id` int(11) NOT NULL DEFAULT '0'",
  "`fifthid` int(11) NOT NULL DEFAULT '0'",
  "`type` int(4) NOT NULL DEFAULT '0'",
  '`day` date DEFAULT NULL,) ENGINE=InnoDB AUTO_INCREMENT=3170072820 DEFAULT CHARSET=utf8/*!50100 PARTITION BY HASH (districtid)PARTITIONS 100 */']}

## take dict and return as dataframe

In [25]:
def dict_to_df(dic_list):

  ky = []
  vl = []
  ty = []
  for k,v in dic_list.items():
    for x in v:
        xx = x.split(" ")[0:2]
        vl.append(xx[0].replace("`",""))
        ty.append(xx[1].split("(")[0])
        ky.append(k.replace("`",""))

  return pd.DataFrame({"tables":ky,"vars":vl, "types":ty})

def sql_to_spark(multi_text):
  dict_out =create_table_to_dict(multi_text)
  return dict_to_df(dict_out)

create_df = sql_to_spark(create_tabs) 
create_df # example

Unnamed: 0,tables,types,vars
0,btable,bigint,btableid
1,btable,int,id
2,btable,int,fifthid
3,btable,int,type
4,btable,date,day
5,atable,bigint,id
6,atable,int,anotherid
7,atable,int,thirdid
8,atable,timestamp,timestamp
9,atable,int,fourthid


### handle wrapping vars in type-casting code

In [18]:
def cast_parse(type_str,var_str):
    if "int" in type_str:
        return "int("+var_str+")"
    elif "float" in type_str:
        return "float("+var_str+")"
    elif "time" in type_str:
        return "parse("+var_str+")"
    elif "date" in type_str :
        return "parse("+var_str+")"
    else : #str_reg.search(type_str)
        return var_str
cast_parse("float", "real_num")  # example

'float(real_num)'

## Wondering below how I've handled enumerate, zip, and groupby to manage list comprehension

In [20]:
def col_splitter(colnames,coltypes):
    return ", ".join(
        [v[0]+'='+ cast_parse(v[1],'p['+str(i)+']')
         for i,v in enumerate(zip(colnames, coltypes))] )  # is this best way?

names = ["var1", "var2"]
types = ['float', 'timestamp']

col_splitter(names, types)    # example

'var1=float(p[0]), var2=parse(p[1])'

In [21]:
def col_orderer(colnames):
    return str([str(t) for t in colnames])

col_orderer(names)  # example

"['var1', 'var2']"

In [26]:
def split_name(data, delimiter="," ):
    
    # handle either data as sql script 
    # or already parsed sql script as pandas df
    if "pandas" in str(type(data)):
        df_tables = data
    else:
        df_tables = sql_to_spark(data)
        
    row_splitter = ".map(lambda k: k.split('"+delimiter+"'))"
    
    d={k: ".map(lambda p: Row(" + 
           col_splitter(v.vars,v.types) +
               ")))[" + 
           col_orderer(v.vars) + "]"
       for k,v in df_tables.groupby("tables")}    ### is this best way?
    
    return [k+"_df = sqlContext.createDataFrame(" +
            k + "_rdd" + row_splitter + p 
            for k,p in zip(d.keys(),d.values())]


### Desired pyspark createDataFrame commands

In [27]:
split_name(create_tabs) # ALSO FINE WITH         split_name(create_df)

["btable_df = sqlContext.createDataFrame(btable_rdd.map(lambda k: k.split(',')).map(lambda p: Row(btableid=int(p[0]), id=int(p[1]), fifthid=int(p[2]), type=int(p[3]), day=parse(p[4]))))[['btableid', 'id', 'fifthid', 'type', 'day']]",
 "atable_df = sqlContext.createDataFrame(atable_rdd.map(lambda k: k.split(',')).map(lambda p: Row(id=int(p[0]), anotherid=int(p[1]), thirdid=int(p[2]), timestamp=parse(p[3]), fourthid=int(p[4]))))[['id', 'anotherid', 'thirdid', 'timestamp', 'fourthid']]"]

In [29]:
split_name(create_df)

["btable_df = sqlContext.createDataFrame(btable_rdd.map(lambda k: k.split(',')).map(lambda p: Row(btableid=int(p[0]), id=int(p[1]), fifthid=int(p[2]), type=int(p[3]), day=parse(p[4]))))[['btableid', 'id', 'fifthid', 'type', 'day']]",
 "atable_df = sqlContext.createDataFrame(atable_rdd.map(lambda k: k.split(',')).map(lambda p: Row(id=int(p[0]), anotherid=int(p[1]), thirdid=int(p[2]), timestamp=parse(p[3]), fourthid=int(p[4]))))[['id', 'anotherid', 'thirdid', 'timestamp', 'fourthid']]"]