Skip to content

Commit

Permalink
feat: count entries to the focus command (#12)
Browse files Browse the repository at this point in the history
closes #7
  • Loading branch information
cnishina committed Nov 21, 2022
1 parent 2d5e0d0 commit f9c4db7
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
35 changes: 35 additions & 0 deletions focus.py
Expand Up @@ -12,6 +12,7 @@
_CLIENT_ID = os.getenv("CLIENT_ID")

_DATA_CSV = "focus"
_SUMMARY_CSV = "sumary"
_TWITCH_STREAM_API_ENDPOINT = "https://api.twitch.tv/helix/streams?{}"


Expand Down Expand Up @@ -48,6 +49,7 @@ def _maybe_rotate_files(timestamp: datetime.datetime):
last_local.year != curr_local.year
or last_local.month != curr_local.month
):
_tally_focus_counts(last_local.year, last_local.month)
_rotate_files(last_local.year, last_local.month)
# Return early since we just need to check a single timestamp.
return
Expand All @@ -61,6 +63,39 @@ def _convert_to_local_datetime(timestamp):
return timestamp + timestamp.tzinfo.utcoffset(timestamp)


def _tally_focus_counts(year: int, month: int):
"""Tallies focus commands once per user per day.
The tallying must happen before rotating the file.
Args:
year: The 4 digit number representing the year.
month: The month as a number. This will be formatted to MM.
"""
# dictionary of author and a list of dates
tallies: Dict[str, Union[Set[str], List[str]]] = {}
with open(f"{_DATA_CSV}.csv", "r", newline="") as csvfile:
data_reader = csv.reader(csvfile, delimiter=",", quotechar='"')
for row in data_reader:
author = row[0]
timestamp = datetime.datetime.fromisoformat(row[1])
local_timestamp = _convert_to_local_datetime(timestamp)

date = local_timestamp.strftime("%Y-%m-%d")
tallies.setdefault(author, [])
tallies[author].append(date)

# This helps remove duplicate dates.
for author, dates in tallies.items():
tallies[author] = set(dates)

month = "{:02d}".format(month)
with open(f"{_SUMMARY_CSV}_{year}_{month}.csv", "a", newline="") as csvfile:
data_writer = csv.writer(csvfile, delimiter=",", quotechar='"')
for author, date_count in tallies.items():
data_writer.writerow([author, len(date_count)])


def _rotate_files(year: int, month: int):
"""Rotates the file and removes the original file.
Expand Down
42 changes: 42 additions & 0 deletions focus_test.py
Expand Up @@ -8,9 +8,11 @@
# This ensures that the file we write is not the default one.
# The prefix is for testing only.
focus._DATA_CSV = "test_focus"
focus._SUMMARY_CSV = "test_summary"

_TEST_FILE = "test_focus.csv"
_ROTATED_TEST_FILE = "test_focus_2022_09.csv"
_TEST_SUMMARY_FILE = "test_summary_2022_09.csv"


def initialize_file():
Expand Down Expand Up @@ -38,6 +40,11 @@ def remove_files():
except:
# no-op: tries to remove the rotated file.
pass
try:
os.remove(_TEST_SUMMARY_FILE)
except:
# no-op: tries to remove the summary file.
pass


class FocusTest(unittest.TestCase):
Expand Down Expand Up @@ -113,6 +120,41 @@ def _is_streaming():
count += 1
self.assertEqual(count, 3)

def test_tally_focus_counts(self):
# The test counts for September posts
# The initialize_file writes the same focus command 3 times at 11:59 pm
# on 2002/09/30.
with open(f"{focus._DATA_CSV}.csv", "a", newline="") as csvfile:
data_writer = csv.writer(csvfile, delimiter=",", quotechar='"')
author = "mr bear's friend"
timestamp = datetime.datetime.fromisoformat(
"2022-09-10T06:59:00.000000-07:00"
)
message = "Doing it and doing it and doing it well."
data_writer.writerow([author, timestamp.isoformat(), message])
timestamp = datetime.datetime.fromisoformat(
"2022-09-11T06:59:00.000000-07:00"
)
data_writer.writerow([author, timestamp.isoformat(), message])
timestamp = datetime.datetime.fromisoformat(
"2022-09-12T06:59:00.000000-07:00"
)
data_writer.writerow([author, timestamp.isoformat(), message])
timestamp = datetime.datetime.fromisoformat(
"2022-09-13T06:59:00.000000-07:00"
)
message = "I represent Queens, she was raised out in Brooklyn."
data_writer.writerow([author, timestamp.isoformat(), message])
focus._tally_focus_counts(2022, 9)

tallies: Dict[str, int] = {}
with open(_TEST_SUMMARY_FILE, "r", newline="") as csvfile:
data_reader = csv.reader(csvfile, delimiter=",", quotechar='"')
for row in data_reader:
tallies[row[0]] = row[1]
self.assertEqual(tallies["mr bear"], "1")
self.assertEqual(tallies["mr bear's friend"], "4")


if __name__ == "__main__":
unittest.main()

0 comments on commit f9c4db7

Please sign in to comment.