Skip to content

Commit

Permalink
LIF-489 perf: indempotent spotify_dag_etl
Browse files Browse the repository at this point in the history
  • Loading branch information
BubbaTam committed Nov 9, 2022
1 parent d27f411 commit 3d7f643
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 15 deletions.
24 changes: 19 additions & 5 deletions dags/spotify_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,17 @@
from config_priv import PERSONAL_EMAIL


def get_recently_played_data_from_spotify():
def get_recently_played_data_from_spotify(time_of_dag):
import os

from config_priv import WORKING_DIRECTORY
from src.data import SpotifyClass
SpotifyClass.get_data_from_spotify_api()
from src.utilities import ISO_8601_to_unix_timestamp_milliseconds

os.chdir(WORKING_DIRECTORY)

time = ISO_8601_to_unix_timestamp_milliseconds(time_of_dag,"%Y-%m-%dT%H:%M:%S%z")
SpotifyClass.get_data_from_spotify_api(time)



Expand All @@ -33,15 +41,21 @@ def json_data_to_sql_file():
dag = DAG(
dag_id = "spotify_data_dag",
default_args=args,
# at minute 0 (every new full hour)
#schedule_interval = "0 * * * *",
# CRON expression for 19:45
schedule_interval = "45 19 * * *",
template_searchpath="src/sql_queries",
#schedule_interval = "45 19 * * *",
schedule_interval = "@hourly",
template_searchpath=SQL_LOCATION,
catchup=False)

with dag:
get_spotify_data = PythonOperator(
task_id="extract_spotify_data",
python_callable = get_recently_played_data_from_spotify
python_callable = get_recently_played_data_from_spotify,
op_kwargs={
'time_of_dag': '{{ ts }}'
}
)
load_data_to_sql_file = PythonOperator(
task_id="transfer_data_to_sql_file",
Expand Down
17 changes: 8 additions & 9 deletions src/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def extract_spotify_data(data):
except:
print("there is a problem saving to file")
return song_dict
def get_data_from_spotify_api():
def get_data_from_spotify_api(time):
def create_spotify_oauth():
return SpotifyOAuth(
client_id = SPOTIFY_CLIENT_ID,
Expand All @@ -63,16 +63,15 @@ def get_token():
oauth = create_spotify_oauth()
token = oauth.refresh_access_token(token_data["refresh_token"])
return token
def recently_played():
print("start of recently played")
try:
token_info = get_token()
except:
print("no log in details")

try:
token_info = get_token()
sp = spotipy.Spotify(auth=token_info["access_token"])
data = sp.current_user_recently_played(limit=50,after=time_at_start_today())
data = sp.current_user_recently_played(limit=50,after=time)
SpotifyClass.extract_spotify_data(data)
return recently_played()
except:
raise ValueError("no log in details")

def load_data_to_sql():
"""
will need to transform beforehand
Expand Down
16 changes: 15 additions & 1 deletion src/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,18 @@ def save_json_file(data_variable,folder_location, file_name:str,):
f.write(json_file)

def dummy_sql_statement():
return "select 2+2;"
return "select 2+2;"

def ISO_8601_to_unix_timestamp_milliseconds(time:str, time_format:str)-> str:
"""
Return the conversion of a time string to unix timestamp milliseconds
Examples time format strings:
2022-11-09T09:55:19+00:00 == "%Y-%m-%dT%H:%M:%S%z"
2022-11-09 == "%Y-%m-%d"
"""
datetime_object = datetime.datetime.strptime(time, time_format)
time = int(datetime_object.timestamp()) *1000
return time

0 comments on commit 3d7f643

Please sign in to comment.