In [1]:
import json
import asyncio
import nest_asyncio
nest_asyncio.apply()
# loop = asyncio.get_event_loop()

from databases import Database
database = Database('mysql://gmaps:oIc1burIsKZYLRxKjZPN@database-1.chgxrutzqyvi.eu-central-1.rds.amazonaws.com:3306/innodb?charset=ascii')
await database.connect()
transaction = await database.transaction()

# Create tables

In [None]:
# Main table
sql_main_table = """
    CREATE TABLE IF NOT EXISTS commercial_premise (
        id INT NOT NULL AUTO_INCREMENT,
        name VARCHAR(100) NOT NULL UNIQUE,
        zip_code INT(5) NOT NULL,
        coordinates VARCHAR(25),
        telephone_number VARCHAR(25),
        opennig_hours VARCHAR(200),
        type VARCHAR(20) NOT NULL,
        score FLOAT(2) DEFAULT 0.0,
        total_scores INT(10) DEFAULT 0,
        price_range VARCHAR(5),
        style VARCHAR(20),
        address VARCHAR(40) NOT NULL,
        PRIMARY KEY(ID)
    )
"""

sql_comments = """
    CREATE TABLE IF NOT EXISTS commercial_premise_comments (
        id INT NOT NULL AUTO_INCREMENT,
        commercial_premise_id INT NOT NULL,
        content VARCHAR(200),
        PRIMARY KEY(id),
        INDEX prem_ind (commercial_premise_id),
        FOREIGN KEY (commercial_premise_id)
            REFERENCES commercial_premise(id)
            ON DELETE CASCADE
            ON UPDATE CASCADE
    )
"""

sql_ocupation = """
    CREATE TABLE IF NOT EXISTS commercial_premise_occupation (
        id INT NOT NULL AUTO_INCREMENT,
        commercial_premise_id INT NOT NULL,
        week_day VARCHAR(9) NOT NULL,
        time_period CHAR(2) NOT NULL,
        occupation FLOAT DEFAULT 0.0,
        PRIMARY KEY(id),
        INDEX prem_ind (commercial_premise_id),
        FOREIGN KEY (commercial_premise_id)
            REFERENCES commercial_premise(id)
            ON DELETE CASCADE
            ON UPDATE CASCADE
    )
"""

await database.execute("DROP TABLE IF EXISTS commercial_premise_occupation")
await database.execute("DROP TABLE IF EXISTS commercial_premise_comments")
await database.execute("DROP TABLE IF EXISTS commercial_premise")

await database.execute(sql_main_table)
await database.execute(sql_comments)
await database.execute(sql_ocupation)

await database.fetch_all("SHOW TABLES")

[('commercial_premise',),
 ('commercial_premise_comments',),
 ('commercial_premise_occupation',)]

In [None]:
from pprint import pprint
sql1 = "DESCRIBE commercial_premise"
sql2 = "DESCRIBE commercial_premise_comments"
sql3 = "DESCRIBE commercial_premise_occupation"
pprint(await database.fetch_all(sql1))
pprint(await database.fetch_all(sql2))
pprint(await database.fetch_all(sql3))

[('id', 'int(11)', 'NO', 'PRI', None, 'auto_increment'),
 ('name', 'varchar(100)', 'NO', 'UNI', None, ''),
 ('zip_code', 'int(5)', 'NO', '', None, ''),
 ('coordinates', 'varchar(25)', 'YES', '', None, ''),
 ('telephone_number', 'varchar(25)', 'YES', '', None, ''),
 ('opennig_hours', 'varchar(200)', 'YES', '', None, ''),
 ('type', 'varchar(20)', 'NO', '', None, ''),
 ('score', 'float', 'YES', '', '0', ''),
 ('total_scores', 'int(10)', 'YES', '', '0', ''),
 ('price_range', 'varchar(5)', 'YES', '', None, ''),
 ('style', 'varchar(20)', 'YES', '', None, ''),
 ('address', 'varchar(40)', 'NO', '', None, '')]
[('id', 'int(11)', 'NO', 'PRI', None, 'auto_increment'),
 ('commercial_premise_id', 'int(11)', 'NO', 'MUL', None, ''),
 ('content', 'varchar(200)', 'YES', '', None, '')]
[('id', 'int(11)', 'NO', 'PRI', None, 'auto_increment'),
 ('commercial_premise_id', 'int(11)', 'NO', 'MUL', None, ''),
 ('week_day', 'varchar(9)', 'NO', '', None, ''),
 ('time_period', 'char(2)', 'NO', '', None, ''),
 ('o

# Read data

In [4]:
with open("../data.json", "r") as file:
    sample = json.loads(file.read())

In [5]:
sample.keys()

dict_keys(['Bar Restaurante Víctor', 'Restaurante-Cocktail Yamike', 'Café Bar Bilbao', 'Aitxiar', 'La Olla de la Plaza Nueva', 'con B de bilbao', 'Restaurante Kalderapeko', 'Restaurante Berton Bukoi', 'Bar Claudio La Feria Del Jamón', 'Musume Izakaya', 'Vizcaya Bi', 'Lontz', 'La Salve', 'Xukela', 'Sorginzulo', 'BIKIAK - Bar de Pintxos - BILBAO', 'Casa Víctor Montes', 'El Ciervo', 'Tirauki', 'Batzoki Bilbo Zaharra'])

# Format data

In [6]:
def decompose_occupancy_data(occupancy_levels):
    occupancy = {
        "lunes": {},
        "martes": {},
        "miercoles": {},
        "jueves": {},
        "viernes": {},
        "sabado": {},
        "domingo": {}
    }
    for week_day, occupancy_levels in occupancy_levels.items():
        if(occupancy_levels is not None):
            for occupancy_level in occupancy_levels:
                if(occupancy_level is not None):
                    try:
                        base = occupancy_level.split(":")[1:]
                        occupancy[week_day].update({
                            base[1].split(")")[0].strip(): float(base[0].split("\xa0%")[0])
                        })
                    except:
                        pass
    return occupancy
        

In [7]:
async def write_to_db(element, zip_code, premise_type):
    
    #Store element
    sql = """
        INSERT INTO commercial_premise 
            (name, zip_code, coordinates, telephone_number, opennig_hours, type, score, total_scores, price_range, style, address) 
            VALUES (:name, :zip_code, :coordinates, :telephone_number, :opennig_hours, :type, :score, :total_scores, :price_range, :style, :address)
        """
    compressed_address = [item.strip() for item in element["address"].split("·")]
    if(len(compressed_address) == 3):
        price_range = compressed_address[0]
        style = compressed_address[1]
        address = compressed_address[2]
    else:
        price_range = None
        style = compressed_address[0]
        address = compressed_address[1]
    keys = element.keys()
    values = {
        "name": element["name"],
        "zip_code": 48005,
        "coordinates": element["coordinates"] if "coordinates" in keys else None,
        "telephone_number": element["telephone_number"] if "telephone_number" in keys else None,
        "opennig_hours": ",".join(element["opennig_hours"]) if "opennig_hours" in keys and element["opennig_hours"] is not None else None,
        "type": "restaurante",
        "score": float(element["score"].replace(",", ".")),
        "total_scores": element["total_scores"],
        "price_range": price_range,
        "style": style,
        "address": address
    }
    element_id = await database.execute(sql, values)
    #Store comments
    if("comments" in keys):
        sql = """
            INSERT INTO commercial_premise_comments
            (commercial_premise_id, content)
            VALUES (:commercial_premise_id, :content)
        """
        values = []
        for comment in element["comments"]:
            values.append({
                "commercial_premise_id": element_id,
                "content": comment
            })

        database.execute_many(sql, values)

    #Store occupancy data
    sql = """
        INSERT INTO commercial_premise_occupation
        (
            commercial_premise_id, week_day,
            time_period, occupation
        )
        VALUES (:commercial_premise_id, :week_day, :time_period, :occupation)
    """
    values = []
    if("occupancy" in keys):
        for week_day, content in decompose_occupancy_data(element["occupancy"]).items():
            if content is not None and content != {}:
                for key,value in content.items():
                    values.append({
                        "commercial_premise_id": element_id,
                        "week_day": week_day,
                        "time_period": key,
                        "occupation": value
                    })
        database.execute_many(sql, values)
    return "OK"

In [8]:
import time

In [None]:
# task = asyncio.ensure_future((write_to_db(element, "48005", "restaurante") for _, element in sample.items()))
# task1 = asyncio.ensure_future(write_to_db(sample["Restaurante-Cocktail Yamike"], "48005", "restaurante"))
# task2 = asyncio.ensure_future(write_to_db(sample["Bar Restaurante Víctor"], "48005", "restaurante"))
# loop = asyncio.get_running_loop()
loop = asyncio.get_event_loop()
# loop.run_until_complete(task1)
# loop.run_until_complete(task2)
t1 = time.time()
[
    loop.run_until_complete(asyncio.ensure_future(write_to_db(element, "48005", "restaurante"))) for _, element in sample.items()
]
t2 = time.time()
print(t2-t1)



TypeError: An asyncio.Future, a coroutine or an awaitable is required