In [1]:
from datetime import datetime, date, timedelta
from typing import Dict, List
import os
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.ext.declarative import declared_attr
import uuid
from sqlalchemy.dialects.postgresql import UUID


app = Flask(__name__)

app.config['SQLALCHEMY_DATABASE_URI'] = "sqlite:///"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False

db = SQLAlchemy(app)


In [22]:
import pandas as pd
def determine_file_type(file_path_in) -> str:

    df = InputService.read_file_path_into_df(file_path_in, df_encoding='utf-8', delimiter=';')
    column_name_list = list(df.columns.values)

    if ('channel_code' in column_name_list and 'channel_code' in column_name_list):
        return 'account_list'

    elif ('arrival_country_code' in column_name_list and 'active' in column_name_list):
        return 'distance_sale_list'

    elif ('SKU' in column_name_list and 'name' in column_name_list and 'tax_code' in column_name_list and 'unit_cost_price_currency_code' in column_name_list):
        return 'item_list'

    elif ('country_code' in column_name_list and 'number' in column_name_list):
        return 'vat_numbers'

    else:
        print('ehhh ?')

def determine_data_type(file_type: str) -> str:
    if file_type in ['account_list', 'distance_sale_list', 'item_list', 'vat_numbers']:
        return 'static'
    else:
        raise

class InputService:
    @staticmethod
    def read_file_path_into_df(file_path: str, df_encoding: str, delimiter: str) -> pd.DataFrame:
        if os.path.isfile(file_path):
            file_name = os.path.basename(file_path)
            print('file_name: ', file_name, flush=True)
            try:
                if file_name.lower().endswith('.csv'):
                    df_raw = pd.read_csv(file_path, encoding=df_encoding, delimiter=delimiter)
                    df = df_raw.dropna(how='all')
                    print('df.head()', flush=True)
                    print(df.head(), flush=True)
                else:
                    print('Case "Read File Path: File extension invalid"', flush=True)
                    raise UnsupportedMediaType(
                        'File extension invalid (file: {}).'.format(file_name))
                return df
            except:
                print('Case "Read File Path: Cannot read file"', flush=True)

                raise UnsupportedMediaType(
                    'Cannot read file {}.'.format(file_name))

        else:
            raise  # !!! (not a file)


In [23]:
file_path_in = '/Users/tm/Projects/NTAMAZON/webapp/api/data/test/GOLDEN FIRE/distance_sales_filled.csv'

In [24]:
file_type = determine_file_type(file_path_in)
data_type = determine_data_type(file_type)

file_name:  distance_sales_filled.csv
df.head()
   seller_firm_public_id  valid_from  valid_to arrival_country_code  active
0                    NaN  01.06.2018       NaN                   CZ    True
1                    NaN  01.06.2018       NaN                   DE    True
2                    NaN  01.06.2018       NaN                   ES    True
3                    NaN  01.06.2018       NaN                   FR    True
4                    NaN  01.06.2018       NaN                   GB    True


In [25]:
print('file_type: ', file_type)
print('data_type: ', data_type)

file_type:  distance_sale_list
data_type:  static


In [8]:
from urllib import request
import xml.etree.ElementTree as ET

In [5]:

def send_request_via_urllib(country_code: str, number: str):
    #adapted from here: https://github.com/ajcerejeira/vat-validator/blob/master/vat_validator/vies.py
    from urllib import request
    import xml.etree.ElementTree as ET

    # Prepare the SOAP request
    envelope = ET.Element(
        "soapenv:Envelope",
        attrib={
            "xmlns:hs": "urn:ec.europa.eu:taxud:vies:services:checkVat:types",
            "xmlns:soapenv": "http://schemas.xmlsoap.org/soap/envelope/",
        },
    )
    body = ET.SubElement(envelope, "soapenv:Body")
    action = ET.SubElement(body, "hs:checkVat")
    ET.SubElement(action, "hs:countryCode").text = country_code
    ET.SubElement(action, "hs:vatNumber").text = number
    payload = ET.tostring(envelope, encoding="utf-8")

    # Send the SOAP request to VIES Webservice
    url = "http://ec.europa.eu/taxation_customs/vies/services/checkVatService"
    req = request.Request(url, data=payload)
    res = request.urlopen(req)

    # Parse the result
    res_envelope = ET.fromstring(res.read())
    namespace = "urn:ec.europa.eu:taxud:vies:services:checkVat:types"
    request_date = res_envelope.find(".//{{{}}}requestDate".format(namespace))
    valid = res_envelope.find(".//{{{}}}valid".format(namespace))
    name = res_envelope.find(".//{{{}}}name".format(namespace))
    address = res_envelope.find(".//{{{}}}address".format(namespace))

    response_object = {
        'valid': valid is not None and valid.text == "true",
        'request_date': datetime.strptime(request_date.text, "%Y-%m-%d%z").date()
                        if request_date is not None and request_date.text is not None
                        else None,
        'name': name.text if name is not None else None,
        'address': address.text if address is not None else None
    }

    return response_object

In [29]:
response_object = check_vat('CZ', '684811557')

In [30]:
response_object

{'country_code': 'CZ',
 'vat': '684811557',
 'valid': True,
 'request_date': datetime.date(2020, 7, 11),
 'name': 'FRIAS SEPULVEDA A R Y GUZMAN SEPULVEDA M A',
 'address': 'Cl Monasterio de Irache 4\n31591 CORELLA (NAVARRA)\n '}

In [35]:
def send_request_via_zeep(country_code: str, number: str):
    from zeep import Client, helpers
    client = Client(VIES_WSDL_URL)
    vat_zeep_object = client.service.checkVat(country_code, number)
    vat = helpers.serialize_object(vat_zeep_object)
    response_object = {
        'valid': vat['valid'],
        'request_date': vat['requestDate'] if isinstance(vat.get(['requestDate']), date) else None,
        'name': None if vat['address'] == '---' else vat['address'],
        'address': None if vat['name'] == '---' else vat['name']
    }

    return response_object

In [9]:
response_object = send_request_via_urllib('GB', '235032544')

In [10]:
response_object

{'valid': True,
 'request_date': datetime.date(2020, 7, 11),
 'name': 'CARSTENS & ROBINSON LTD',
 'address': '26 OKEHURST ROAD\nEASTBOURNE\nEAST SUSSEX\n\n\nBN21 1QP'}

In [31]:
from datetime import date, datetime
from time import sleep

In [32]:
print("first")
sleep(4)
print("second")

first
second


In [2]:
class User(db.Model):  # type: ignore
    """ User parent model """
    __tablename__ = "user"

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String)
    level = db.Column(db.Integer)
    
    def __repr__(self):
        return '<User: {} | Name: {} | Level: {}>'.format(self.id, self.name, self.level)

    
    def update(self, data_changes):
        for key, val in data_changes.items():
            setattr(self, key, val)
        return self
    
    def update_diverging(self, data_changes):
        for k, v in data_changes.items():
            if data_changes[k] is not None and data_changes[k] != getattr(self, k):
                setattr(self, k, v)
        return self
    

In [4]:
u1 = User(name='Thomas', level=5)
u2 = User(name='Tobi', level=2)

db.session.add(u1)
db.session.add(u2)
db.session.commit()

data_changes = {
    'name': 'Egon'
}

print(u1)
print(u2)

<User: 1 | Name: Thomas | Level: 5>
<User: 2 | Name: Tobi | Level: 2>


In [5]:
u1.update_diverging(data_changes)
u2.update_diverging(data_changes)

db.session.commit()

In [6]:
print(u1)
print(u2)

<User: 1 | Name: Egon | Level: 5>
<User: 2 | Name: Egon | Level: 2>


In [2]:
class Business(db.Model):  # type: ignore
    """ Business parent model """
    __tablename__ = "business"

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(24))
    created_on = db.Column(db.DateTime, default=datetime.utcnow)
    created_by = db.Column(db.Integer,  db.ForeignKey('user.id', name='fk_user_business_created_by'))
    employees = db.relationship('User', backref="employer", primaryjoin='User.employer_id==Business.id')

    
    
class User(db.Model):  # type: ignore
    """ User parent model """
    __tablename__ = "user"

    id = db.Column(db.Integer, primary_key=True)
    employer_id = db.Column(db.Integer, db.ForeignKey('business.id', name='fk_business_user_employer_id'))
    created_businesses = db.relationship('Business', backref='creator', order_by="desc(Business.created_on)", primaryjoin='Business.created_by==User.id', post_update=True)
    items = db.relationship('Item', backref="user", lazy='select')

    
    
class Item(db.Model):  # type: ignore
    """ User parent model """
    __tablename__ = "item"

    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id', name='fk_user_item_user_id'))
    
    

In [3]:
#db.reflect()
print("committing") #https://stackoverflow.com/questions/24289808/drop-all-freezes-in-flask-with-sqlalchemy
db.session.remove()

print("Dopping all")
db.drop_all()

print("Creating all")
db.create_all()

committing
Dopping all
Creating all


In [4]:
b1 = Business(name="b1")
b2 = Business(name="b2")

u1 = User()
u2 = User()

i1 = Item()
i2 = Item()

db.session.add(b1)
db.session.add(b2)
db.session.add(u1)
db.session.add(u2)
db.session.add(i1)
db.session.add(i2)

db.session.commit()

b1.employees.append(u1)
b1.employees.append(u2)
u1.created_businesses.append(b1)
u2.created_businesses.append(b2)
u1.items.append(i1)
u1.items.append(i2)
db.session.commit()

print(b1.employees)
print(u1.employer.name)
print(u1.created_businesses)
print(b2.created_by)
print(u1.items)


[<User 1>, <User 2>]
b1
[<Business 1>]
2
[<Item 1>, <Item 2>]
