Skip to content

Commit

Permalink
Update origin: Add ratio support and a number of algorithm improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
dyang415 committed Sep 15, 2023
1 parent 5b87c29 commit c092069
Show file tree
Hide file tree
Showing 75 changed files with 3,125 additions and 1,746 deletions.
198 changes: 23 additions & 175 deletions backend/app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import json
import os
from dataclasses import asdict
from datetime import datetime

import pandas as pd
import polars
import sentry_sdk
from app.data_source import bp as data_source_bp
from app.file_upload.services.file_upload import FileUploadService
from app.insight.datasource.bqMetrics import BqMetrics
from app.insight.services.metrics import MetricsController, NpEncoder
from app.insight.services.segmentInsight import get_segment_insight
from config import Config
from flask import Flask, request
from flask import Flask
from flask_appbuilder import SQLA, AppBuilder
from flask_cors import CORS
from loguru import logger
from orjson import orjson
from sentry_sdk.integrations.flask import FlaskIntegration

flask_env_value = os.environ.get('FLASK_ENV', '')
if flask_env_value != 'development':
from app.index_view import DSenseiIndexView
from config import ConfigKey, get_config

basedir = os.path.abspath(os.path.dirname(__file__))

app = Flask(__name__, static_url_path='')
app.config.from_object(get_config(os.environ.get("FLASK_ENV", "production")))
app.config.from_prefixed_env("FLASK")
app.config.from_prefixed_env("DSENSEI")
CORS(app)
app._static_folder = os.path.abspath("static/")

app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'app.db')
app.config['CSRF_ENABLED'] = True

if app.config[ConfigKey.ENABLE_TELEMETRY.name]:
sentry_sdk.init(
dsn="https://196e3946ca25bbb9c939c14a7daa2da8@o4505710546190336.ingest.sentry.io/4505711370698752",
integrations=[
Expand All @@ -29,163 +31,9 @@
traces_sample_rate=1.0,
include_local_variables=False
)
app = Flask(__name__, static_url_path='')

app.config.from_object(Config)
app.register_blueprint(data_source_bp)
CORS(app)
app._static_folder = os.path.abspath("static/")

agg_method_map = {
"sum": "sum",
"count": "count",
"distinct": "nunique"
}


@app.route('/')
def main():
return app.send_static_file('index.html')


@app.route('/dashboard')
def dashboard():
return app.send_static_file('index.html')


def parse_data(data):
baseDateRange = data['baseDateRange']
comparisonDateRange = data['comparisonDateRange']
date_column = data['dateColumn']
# TODO(liuyl): Fix this, right now did not pass this value to backend
date_column_type = data['dateColumnType'] if 'dateColumnType' in data else 'date'
group_by_columns = data['groupByColumns']
metric_column = data['metricColumn']

baselineStart = datetime.strptime(
baseDateRange['from'], '%Y-%m-%dT%H:%M:%S.%fZ').date()
baselineEnd = datetime.strptime(
baseDateRange['to'], '%Y-%m-%dT%H:%M:%S.%fZ').date()
comparisonStart = datetime.strptime(
comparisonDateRange['from'], '%Y-%m-%dT%H:%M:%S.%fZ').date()
comparisonEnd = datetime.strptime(
comparisonDateRange['to'], '%Y-%m-%dT%H:%M:%S.%fZ').date()

metrics_name = {metric_column['columnNames']
[0]: metric_column['columnNames'][0]}
agg_method = {metric_column['columnNames']
[0]: metric_column['aggregationOption']}
metrics_name.update({date_column: 'count'})
agg_method.update({date_column: 'count'})

# TODO: Fix this, right now did not pass this value to backend
expected_value = 0

return (baselineStart, baselineEnd, comparisonStart, comparisonEnd, date_column, date_column_type, agg_method, metrics_name, group_by_columns, expected_value)


@app.route('/api/bqinsight', methods=['POST'])
def getBqInsight():
data = request.get_json()
table_name = data['tableName']

(baselineStart, baselineEnd, comparisonStart, comparisonEnd, date_column, date_column_type,
agg_method, metrics_name, group_by_columns, expected_value) = parse_data(data)

bq_metric = BqMetrics(
table_name=table_name,
baseline_period=(baselineStart, baselineEnd),
comparison_period=(comparisonStart, comparisonEnd),
date_column=date_column,
date_column_type=date_column_type,
agg_method=agg_method,
metrics_name=metrics_name,
columns=group_by_columns,
expected_value=expected_value)
return bq_metric.get_metrics()


@app.route('/api/segment-insight', methods=['POST'])
def get_time_series():
data = request.get_json()

fileId = data['fileId']

(baselineStart, baselineEnd, comparisonStart, comparisonEnd, date_column, date_column_type,
agg_method, metrics_name, group_by_columns, expected_value) = parse_data(data)

segment_key = data['segmentKey']
filtering_clause = polars.lit(True)
for sub_key in segment_key:
filtering_clause = filtering_clause & (polars.col(
sub_key['dimension']).cast(str).eq(polars.lit(sub_key['value'])))

df = polars.read_csv(f'/tmp/dsensei/{fileId}') \
.with_columns(polars.col(date_column).str.slice(0, 10).str.to_date().alias("date")) \
.filter(filtering_clause)

return orjson.dumps(
get_segment_insight(
df,
date_column,
(baselineStart, baselineEnd),
(comparisonStart, comparisonEnd),
agg_method,
metrics_name
)
)


@app.route('/api/insight', methods=['POST'])
def getInsight():
data = request.get_json()
file_id = data['fileId']

(baselineStart, baselineEnd, comparisonStart, comparisonEnd, date_column, date_column_type,
agg_method, metrics_name, group_by_columns, expected_value) = parse_data(data)

logger.info('Reading file')
df = polars.read_csv(f'/tmp/dsensei/{file_id}') \
.with_columns(polars.col(date_column).str.slice(0, 10).str.to_date().alias("date"))
logger.info('File loaded')

metrics = MetricsController(
df,
(baselineStart, baselineEnd),
(comparisonStart, comparisonEnd),
date_column,
group_by_columns,
agg_method,
metrics_name,
expected_value
)

return metrics.getMetrics()


@app.route('/api/file_upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return "No file part in the request", 400

file = request.files['file']
if file.filename == '':
return "No selected file", 400

try:
file_data = file.read()
output_dir = "/tmp/dsensei" # Change the output directory if needed

# Create an instance of FileProcessor and use its methods
file_processor = FileUploadService()
md5 = file_processor.save_file_with_md5(file_data, output_dir)
return json.dumps({'id': md5}), 200
except FileExistsError as e:
return str(e), 409
except Exception as e:
print(e)
return str(e), 500


if __name__ == '__main__':
app.run(processes=4, port=5001)
db = SQLA(app)
appbuilder = AppBuilder(app, db.session, indexview=DSenseiIndexView)
from app.data_source import routes
from app.insight import routes
from app.settings import routes
Binary file added backend/app/app.db
Binary file not shown.
File renamed without changes.
7 changes: 7 additions & 0 deletions backend/app/common/request_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from orjson import orjson


def build_error_response(error: str) -> str:
return orjson.dumps({
'error': error
})
6 changes: 0 additions & 6 deletions backend/app/data_source/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1 @@
from flask import Blueprint
from flask_cors import CORS, cross_origin

bp = Blueprint('data-source', __name__, url_prefix='/api/data-source')
CORS(bp)

from app.data_source import routes
Empty file.
38 changes: 38 additions & 0 deletions backend/app/data_source/bigquery/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import json

from flask_appbuilder import expose
from flask_appbuilder.api import BaseApi
from google.api_core.exceptions import NotFound
from google.auth.exceptions import GoogleAuthError
from loguru import logger
from orjson import orjson

from app.common.request_utils import build_error_response
from app.data_source.bigquery.bigquery_source import BigquerySource


class BigQuerySourceApi(BaseApi):
resource_name = 'source/bigquery'
bigquery_source = BigquerySource()

@expose('/schema/<full_name>', methods=['GET'])
def get_schema(self, full_name: str):
try:
return orjson.dumps(self.bigquery_source.get_schema(full_name))
except NotFound as e:
return build_error_response('Table not found.'), 404
except GoogleAuthError as e:
return build_error_response('Auth failed.'), 403
except Exception as e:
logger.exception(e)
return build_error_response('Internal server error.'), 500

@expose('/dataset', methods=['GET'])
def list_datasets(self):
try:
return orjson.dumps(self.bigquery_source.list_dataset())
except GoogleAuthError as e:
return build_error_response(str(e)), 403
except Exception as e:
logger.exception(e)
return json.dumps({'error': 'Internal server error.'}), 500
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from concurrent.futures import ThreadPoolExecutor, wait
from pprint import pprint

from app.data_source.datasource import BigquerySchema, Dataset, Field
from google.cloud import bigquery
from google.cloud.bigquery.table import RowIterator

from app.data_source.models import Field, Dataset, BigquerySchema

query_executor = ThreadPoolExecutor(max_workers=10)


Expand Down
15 changes: 0 additions & 15 deletions backend/app/data_source/datasource/csvSource.py

This file was deleted.

Empty file.
37 changes: 37 additions & 0 deletions backend/app/data_source/file/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from flask import request
from flask_appbuilder.api import BaseApi, expose
from loguru import logger
from orjson import orjson

from app.common.request_utils import build_error_response
from app.data_source.file.file_source import FileSource
from app.data_source.file.file_upload_service import FileUploadService


class FileSourceApi(BaseApi):
resource_name = 'source/file'

@expose('/schema', methods=['POST'])
def load_schema(self):
logger.info("Loading file from request")
if 'file' not in request.files:
return build_error_response("No file part in the request"), 400
file = request.files['file']
if file.filename == '':
return build_error_response("No selected file"), 400

try:
logger.info("Saving file to disk")
file_data = file.read()
output_dir = "/tmp/dsensei"

file_processor = FileUploadService()
md5 = file_processor.save_file_with_md5(file_data, output_dir)

schema = FileSource(md5).load_schema()
return orjson.dumps(schema), 200
except FileExistsError as e:
return build_error_response(str(e)), 409
except Exception as e:
logger.exception(e)
return build_error_response(str(e)), 500
Loading

0 comments on commit c092069

Please sign in to comment.