Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean logging #83

Merged
merged 2 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ coverage.xml
scala/tempo/target
scala/tempo/project/target/
scala/tempo/project/project/target/
.bsp

# local delta tables
**/spark-warehouse
Expand All @@ -19,6 +20,7 @@ scala/tempo/project/project/target/
**/dist
**/htmlcov
**/tempo.egg-info
**/dbl_tempo.egg-info

## Python related files
*.pyc
Expand Down
183 changes: 0 additions & 183 deletions python/dbl_tempo.egg-info/PKG-INFO

This file was deleted.

11 changes: 0 additions & 11 deletions python/dbl_tempo.egg-info/SOURCES.txt

This file was deleted.

1 change: 0 additions & 1 deletion python/dbl_tempo.egg-info/dependency_links.txt

This file was deleted.

3 changes: 0 additions & 3 deletions python/dbl_tempo.egg-info/requires.txt

This file was deleted.

1 change: 0 additions & 1 deletion python/dbl_tempo.egg-info/top_level.txt

This file was deleted.

10 changes: 6 additions & 4 deletions python/tempo/io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import pyspark.sql.functions as f
import os
import logging
from collections import deque

import pyspark.sql.functions as f

def write(tsdf, spark, tabName, optimizationCols = None):
"""
param: tsdf: input TSDF object to write
Expand All @@ -18,7 +21,6 @@ def write(tsdf, spark, tabName, optimizationCols = None):
else:
optimizationCols = ['event_time']

import os
useDeltaOpt = (os.getenv('DATABRICKS_RUNTIME_VERSION') != None)

view_df = df.withColumn("event_dt", f.to_date(f.col(ts_col))) \
Expand All @@ -33,9 +35,9 @@ def write(tsdf, spark, tabName, optimizationCols = None):
try:
spark.sql("optimize {} zorder by {}".format(tabName, "(" + ",".join(partitionCols + optimizationCols) + ")"))
except Exception as e:
print("Delta optimizations attempted, but was not successful.\nError: {}".format(e))
logging.error("Delta optimizations attempted, but was not successful.\nError: {}".format(e))
else:
print("Delta optimizations attempted on a non-Databricks platform. Switch to use Databricks Runtime to get optimization advantages.")
logging.warning("Delta optimizations attempted on a non-Databricks platform. Switch to use Databricks Runtime to get optimization advantages.")



7 changes: 4 additions & 3 deletions python/tempo/resample.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import tempo

import logging

import pyspark.sql.functions as f
from datetime import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

# define global frequency options
import tempo

SEC = 'sec'
MIN = 'min'
Expand Down
16 changes: 9 additions & 7 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import pyspark.sql.functions as f
from pyspark.sql.window import Window
import tempo.resample as rs
import tempo.io as tio

import logging
from functools import reduce

import pyspark.sql.functions as f
from pyspark.sql.window import Window


class TSDF:

def __init__(self, df, ts_col="event_ts", partition_cols=None, sequence_col = None):
Expand Down Expand Up @@ -61,7 +66,6 @@ def __addPrefixToColumns(self,col_list,prefix):
"""
Add prefix to all specified columns.
"""
from functools import reduce

df = reduce(lambda df, idx: df.withColumnRenamed(col_list[idx], '_'.join([prefix,col_list[idx]])),
range(len(col_list)), self.df)
Expand All @@ -74,7 +78,6 @@ def __addColumnsFromOtherDF(self, other_cols):
"""
Add columns from some other DF as lit(None), as pre-step before union.
"""
from functools import reduce
new_df = reduce(lambda df, idx: df.withColumn(other_cols[idx], f.lit(None)), range(len(other_cols)), self.df)

return TSDF(new_df, self.ts_col, self.partitionCols)
Expand All @@ -87,7 +90,6 @@ def __combineTSDF(self, ts_df_right, combined_ts_col):
return TSDF(combined_df, combined_ts_col, self.partitionCols)

def __getLastRightRow(self, left_ts_col, right_cols, sequence_col, tsPartitionVal):
from functools import reduce
"""Get last right value of each right column (inc. right timestamp) for each self.ts_col value

self.ts_col, which is the combined time-stamp column of both left and right dataframe, is dropped at the end
Expand Down Expand Up @@ -118,7 +120,7 @@ def __getLastRightRow(self, left_ts_col, right_cols, sequence_col, tsPartitionVa
any_blank_vals = (df.agg({column: 'min'}).collect()[0][0] == 0)
newCol = column.replace("non_null_ct", "")
if any_blank_vals:
print("Column " + newCol + " had no values within the lookback window. Consider using a larger window to avoid missing values. If this is the first record in the data frame, this warning can be ignored.")
logging.warning("Column " + newCol + " had no values within the lookback window. Consider using a larger window to avoid missing values. If this is the first record in the data frame, this warning can be ignored.")
df = df.drop(column)


Expand Down Expand Up @@ -216,7 +218,7 @@ def asofJoin(self, right_tsdf, left_prefix=None, right_prefix="right", tsPartiti
"""

if (tsPartitionVal is not None):
print("WARNING: You are using the skew version of the AS OF join. This may result in null values if there are any values outside of the maximum lookback. For maximum efficiency, choose smaller values of maximum lookback, trading off performance and potential blank AS OF values for sparse keys")
logging.warning("You are using the skew version of the AS OF join. This may result in null values if there are any values outside of the maximum lookback. For maximum efficiency, choose smaller values of maximum lookback, trading off performance and potential blank AS OF values for sparse keys")

# Check whether partition columns have same name in both dataframes
self.__checkPartitionCols(right_tsdf)
Expand Down
3 changes: 2 additions & 1 deletion python/tests/tests.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import unittest

import pyspark.sql.functions as F
Expand Down Expand Up @@ -573,7 +574,7 @@ def test_write_to_delta(self):
# using lookback of 20 minutes
#featured_df = tsdf_left.resample(freq = "min", func = "closest_lead").df
tsdf_left.write(self.spark, "my_table")
print('delta table count ' + str(self.spark.table("my_table").count()))
logging.info('delta table count ' + str(self.spark.table("my_table").count()))

# should be equal to the expected dataframe
assert self.spark.table("my_table").count() == 7
Expand Down