diff --git a/.github/workflows/lint_python.yml b/.github/workflows/lint_python.yml index fb066d28..df2e241b 100644 --- a/.github/workflows/lint_python.yml +++ b/.github/workflows/lint_python.yml @@ -6,14 +6,14 @@ jobs: steps: - uses: actions/checkout@v2 - uses: actions/setup-python@v2 - - run: pip install bandit black codespell flake8 isort mypy pytest pyupgrade safety + - run: pip install bandit black codespell flake8 isort mypy pytest pyupgrade safety types-requests - run: bandit -r . || true - run: black --check . || true - run: codespell --ignore-words-list="followings" --quiet-level=2 - run: flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - run: isort --check-only --profile black . || true - run: pip install -r requirements.txt - - run: mypy --ignore-missing-imports . + - run: mypy --ignore-missing-imports --install-types . - run: pytest . || true - run: pytest --doctest-modules . || true - run: shopt -s globstar && pyupgrade --py36-plus **/*.py || true diff --git a/README.md b/README.md index 56ee829f..2ae64dfd 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Osintgram offers an interactive shell to perform analysis on Instagram account o - addrs Get all registered addressed by target photos - captions Get user's photos captions - comments Get total comments of target's posts +- commentdata Get a list of all the comments on the target's posts - followers Get target followers - followings Get users followed by target - fwersemail Get email of target followers diff --git a/doc/COMMANDS.md b/doc/COMMANDS.md index 3d7271c5..27fc5633 100644 --- a/doc/COMMANDS.md +++ b/doc/COMMANDS.md @@ -28,6 +28,9 @@ The list has post, address and date fields. ### captions Return a list of all captions used by target in his photos. +### commentdata +Return a list of all the comments on the target's posts + ### comments Return the total number of comments in target's posts diff --git a/main.py b/main.py index 28da7ea4..c5864eee 100644 --- a/main.py +++ b/main.py @@ -48,12 +48,16 @@ def cmdlist(): print("Get users followed by target") pc.printout("fwersemail\t") print("Get email of target followers") - pc.printout("fwingsemail\t") - print("Get email of users followed by target") pc.printout("fwersnumber\t") print("Get phone number of target followers") + pc.printout("fwersubset\t") + print("Get the list of users who follow both target1 and target2") + pc.printout("fwingsemail\t") + print("Get email of users followed by target") pc.printout("fwingsnumber\t") - print("Get phone number of users followed by target") + print("Get phone number of users followed by target") + pc.printout("fwingsubset\t") + print("Get the list of users followed by both target1 and target2") pc.printout("hashtags\t") print("Get hashtags used by target") pc.printout("info\t\t") @@ -135,9 +139,11 @@ def _quit(): 'followers': api.get_followers, 'followings': api.get_followings, 'fwersemail': api.get_fwersemail, - 'fwingsemail': api.get_fwingsemail, 'fwersnumber': api.get_fwersnumber, + 'fwersubset': api.get_followers_subset, + 'fwingsemail': api.get_fwingsemail, 'fwingsnumber': api.get_fwingsnumber, + 'fwingsubset': api.get_followings_subset, 'hashtags': api.get_hashtags, 'info': api.get_user_info, 'likes': api.get_total_likes, diff --git a/requirements.txt b/requirements.txt index 605147c8..d131c991 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ requests==2.24.0 requests-toolbelt==0.9.1 geopy>=2.0.0 -prettytable==0.7.2 +prettytable instagram-private-api==1.6.0 gnureadline>=8.0.0; platform_system != "Windows" pyreadline==2.1; platform_system == "Windows" \ No newline at end of file diff --git a/src/Osintgram.py b/src/Osintgram.py index 139b0580..4eb789c0 100644 --- a/src/Osintgram.py +++ b/src/Osintgram.py @@ -45,7 +45,7 @@ def __init__(self, target, is_file, is_json, is_cli, output_dir, clear_cookies): if not is_cli: print("\nAttempt to login...") self.login(u, p) - self.setTarget(target) + self.setTarget(target, True) self.writeFile = is_file self.jsonDump = is_json @@ -53,13 +53,14 @@ def clear_cookies(self,clear_cookies): if clear_cookies: self.clear_cache() - def setTarget(self, target): + def setTarget(self, target, show_output): self.target = target user = self.get_user(target) self.target_id = user['id'] self.is_private = user['is_private'] self.following = self.check_following() - self.__printTargetBanner__() + if(show_output): + self.__printTargetBanner__() def __get_feed__(self): data = [] @@ -107,14 +108,14 @@ def __printTargetBanner__(self): def change_target(self): pc.printout("Insert new target username: ", pc.YELLOW) line = input() - self.setTarget(line) + self.setTarget(line, True) return def get_addrs(self): if self.check_private_profile(): return - pc.printout("Searching for target localizations...\n") + pc.printout("Searching for " + self.target + " localizations...\n") data = self.__get_feed__() @@ -130,6 +131,8 @@ def get_addrs(self): address = {} for k, v in locations.items(): details = self.geolocator.reverse(k) + if not details: + continue unix_timestamp = datetime.datetime.fromtimestamp(v) address[details.address] = unix_timestamp.strftime('%Y-%m-%d %H:%M:%S') @@ -181,7 +184,7 @@ def get_captions(self): if self.check_private_profile(): return - pc.printout("Searching for target captions...\n") + pc.printout("Searching for " + self.target + " captions...\n") captions = [] @@ -239,7 +242,7 @@ def get_total_comments(self): if self.check_private_profile(): return - pc.printout("Searching for target total comments...\n") + pc.printout("Searching for " + self.target + " total comments...\n") comments_counter = 0 posts = 0 @@ -306,7 +309,7 @@ def get_comment_data(self): file_name_json = self.output_dir + "/" + self.target + "_comment_data.json" with open(file_name_json, 'w') as f: f.write("{ \"Comments\":[ \n") - f.write('\n'.join(json.dumps(comment) for comment in _comments) + ',\n') + f.write(',\n'.join(json.dumps(comment) for comment in _comments) + '\n') f.write("]} ") @@ -314,7 +317,7 @@ def get_followers(self): if self.check_private_profile(): return - pc.printout("Searching for target followers...\n") + pc.printout("Searching for " + self.target + " followers...\n") _followers = [] followers = [] @@ -332,6 +335,8 @@ def get_followers(self): results = self.api.user_followers(str(self.target_id), rank_token=rank_token, max_id=next_max_id) _followers.extend(results.get('users', [])) next_max_id = results.get('next_max_id') + sys.stdout.write("\rCatched %i followers" % len(_followers)) + sys.stdout.flush() print("\n") @@ -380,7 +385,7 @@ def get_followings(self): if self.check_private_profile(): return - pc.printout("Searching for target followings...\n") + pc.printout("Searching for " + self.target + " followings...\n") _followings = [] followings = [] @@ -397,6 +402,8 @@ def get_followings(self): results = self.api.user_following(str(self.target_id), rank_token=rank_token, max_id=next_max_id) _followings.extend(results.get('users', [])) next_max_id = results.get('next_max_id') + sys.stdout.write("\rCatched %i followings" % len(_followings)) + sys.stdout.flush() print("\n") @@ -445,7 +452,7 @@ def get_hashtags(self): if self.check_private_profile(): return - pc.printout("Searching for target hashtags...\n") + pc.printout("Searching for " + self.target + " hashtags...\n") hashtags = [] counter = 1 @@ -590,7 +597,7 @@ def get_total_likes(self): if self.check_private_profile(): return - pc.printout("Searching for target total likes...\n") + pc.printout("Searching for " + self.target + " total likes...\n") like_counter = 0 posts = 0 @@ -623,7 +630,7 @@ def get_media_type(self): if self.check_private_profile(): return - pc.printout("Searching for target captions...\n") + pc.printout("Searching for " + self.target + " captions...\n") counter = 0 photo_counter = 0 @@ -946,7 +953,7 @@ def get_user_stories(self): if self.check_private_profile(): return - pc.printout("Searching for target stories...\n") + pc.printout("Searching for " + self.target + " stories...\n") data = self.api.user_reel_media(str(self.target_id)) @@ -1160,7 +1167,7 @@ def check_following(self): def check_private_profile(self): if self.is_private and not self.following: - pc.printout("Impossible to execute command: user has private profile\n", pc.RED) + pc.printout("Impossible to execute command: " + self.target + " has private profile\n", pc.RED) send = input("Do you want send a follow request? [Y/N]: ") if send.lower() == "y": self.api.friendships_create(self.target_id) @@ -1192,8 +1199,6 @@ def get_fwersemail(self): next_max_id = data.get('next_max_id') while next_max_id: - sys.stdout.write("\rCatched %i followers email" % len(followers)) - sys.stdout.flush() results = self.api.user_followers(str(self.target_id), rank_token=rank_token, max_id=next_max_id) for user in results.get('users', []): @@ -1206,6 +1211,7 @@ def get_fwersemail(self): next_max_id = results.get('next_max_id') + print("\n") results = [] @@ -1235,12 +1241,16 @@ def get_fwersemail(self): return for follow in followers: + sys.stdout.write("\rCatched %i followers email" % len(results)) + sys.stdout.flush() user = self.api.user_info(str(follow['id'])) if 'public_email' in user['user'] and user['user']['public_email']: follow['email'] = user['user']['public_email'] if len(results) > value: break results.append(follow) + sys.stdout.write("\rCatched %i followers email" % len(results)) + sys.stdout.flush() except ClientThrottledError as e: pc.printout("\nError: Instagram blocked the requests. Please wait a few minutes before you try again.", pc.RED) @@ -1346,6 +1356,8 @@ def get_fwingsemail(self): if len(results) > value: break results.append(follow) + sys.stdout.write("\rCatched %i followings email" % len(results)) + sys.stdout.flush() except ClientThrottledError as e: pc.printout("\nError: Instagram blocked the requests. Please wait a few minutes before you try again.", pc.RED) @@ -1381,18 +1393,18 @@ def get_fwingsemail(self): else: pc.printout("Sorry! No results found :-(\n", pc.RED) - def get_fwingsnumber(self): + def get_fwersnumber(self): if self.check_private_profile(): return try: - pc.printout("Searching for phone numbers of users followed by target... this can take a few minutes\n") + pc.printout("Searching for phone numbers of users followers... this can take a few minutes\n") - followings = [] + followers = [] rank_token = AppClient.generate_uuid() - data = self.api.user_following(str(self.target_id), rank_token=rank_token) + data = self.api.user_followers(str(self.target_id), rank_token=rank_token) for user in data.get('users', []): u = { @@ -1400,12 +1412,12 @@ def get_fwingsnumber(self): 'username': user['username'], 'full_name': user['full_name'] } - followings.append(u) + followers.append(u) next_max_id = data.get('next_max_id') while next_max_id: - results = self.api.user_following(str(self.target_id), rank_token=rank_token, max_id=next_max_id) + results = self.api.user_followers(str(self.target_id), rank_token=rank_token, max_id=next_max_id) for user in results.get('users', []): u = { @@ -1413,7 +1425,7 @@ def get_fwingsnumber(self): 'username': user['username'], 'full_name': user['full_name'] } - followings.append(u) + followers.append(u) next_max_id = results.get('next_max_id') @@ -1423,7 +1435,7 @@ def get_fwingsnumber(self): value = input() if value == str("y") or value == str("yes") or value == str("Yes") or value == str("YES"): - value = len(followings) + value = len(followers) elif value == str(""): print("\n") return @@ -1443,8 +1455,8 @@ def get_fwingsnumber(self): print("\n") return - for follow in followings: - sys.stdout.write("\rCatched %i followings phone numbers" % len(results)) + for follow in followers: + sys.stdout.write("\rCatched %i followers phone numbers" % len(results)) sys.stdout.flush() user = self.api.user_info(str(follow['id'])) if 'contact_phone_number' in user['user'] and user['user']['contact_phone_number']: @@ -1452,6 +1464,8 @@ def get_fwingsnumber(self): if len(results) > value: break results.append(follow) + sys.stdout.write("\rCatched %i followers phone numbers" % len(results)) + sys.stdout.flush() except ClientThrottledError as e: pc.printout("\nError: Instagram blocked the requests. Please wait a few minutes before you try again.", pc.RED) @@ -1472,14 +1486,14 @@ def get_fwingsnumber(self): t.add_row([str(node['id']), node['username'], node['full_name'], node['contact_phone_number']]) if self.writeFile: - file_name = self.output_dir + "/" + self.target + "_fwingsnumber.txt" + file_name = self.output_dir + "/" + self.target + "_fwersnumber.txt" file = open(file_name, "w") file.write(str(t)) file.close() if self.jsonDump: - json_data['followings_phone_numbers'] = results - json_file_name = self.output_dir + "/" + self.target + "_fwingsnumber.json" + json_data['followers_phone_numbers'] = results + json_file_name = self.output_dir + "/" + self.target + "_fwersnumber.json" with open(json_file_name, 'w') as f: json.dump(json_data, f) @@ -1487,7 +1501,7 @@ def get_fwingsnumber(self): else: pc.printout("Sorry! No results found :-(\n", pc.RED) - def get_fwersnumber(self): + def get_fwingsnumber(self): if self.check_private_profile(): return @@ -1495,7 +1509,7 @@ def get_fwersnumber(self): try: - pc.printout("Searching for phone numbers of users followers... this can take a few minutes\n") + pc.printout("Searching for phone numbers of users followed by target... this can take a few minutes\n") rank_token = AppClient.generate_uuid() @@ -1551,7 +1565,7 @@ def get_fwersnumber(self): return for follow in followings: - sys.stdout.write("\rCatched %i followers phone numbers" % len(results)) + sys.stdout.write("\rCatched %i followings phone numbers" % len(results)) sys.stdout.flush() user = self.api.user_info(str(follow['id'])) if 'contact_phone_number' in user['user'] and user['user']['contact_phone_number']: @@ -1559,6 +1573,8 @@ def get_fwersnumber(self): if len(results) > value: break results.append(follow) + sys.stdout.write("\rCatched %i followings phone numbers" % len(results)) + sys.stdout.flush() except ClientThrottledError as e: pc.printout("\nError: Instagram blocked the requests. Please wait a few minutes before you try again.", pc.RED) @@ -1579,14 +1595,14 @@ def get_fwersnumber(self): t.add_row([str(node['id']), node['username'], node['full_name'], node['contact_phone_number']]) if self.writeFile: - file_name = self.output_dir + "/" + self.target + "_fwersnumber.txt" + file_name = self.output_dir + "/" + self.target + "_fwingsnumber.txt" file = open(file_name, "w") file.write(str(t)) file.close() if self.jsonDump: json_data['followings_phone_numbers'] = results - json_file_name = self.output_dir + "/" + self.target + "_fwerssnumber.json" + json_file_name = self.output_dir + "/" + self.target + "_fwingsnumber.json" with open(json_file_name, 'w') as f: json.dump(json_data, f) @@ -1654,6 +1670,207 @@ def get_comments(self): else: pc.printout("Sorry! No results found :-(\n", pc.RED) + def get_followers_subset(self): + if self.check_private_profile(): + return + + pc.printout("Searching for " + self.target + " followers...\n") + + target_1 = self.target + _followers_target_1 = [] + _followers_target_2 = [] + followers_subset = [] + + + rank_token = AppClient.generate_uuid() + data = self.api.user_followers(str(self.target_id), rank_token=rank_token) + + _followers_target_1.extend(data.get('users', [])) + + next_max_id = data.get('next_max_id') + while next_max_id: + sys.stdout.write("\rCatched %i followers" % len(_followers_target_1)) + sys.stdout.flush() + results = self.api.user_followers(str(self.target_id), rank_token=rank_token, max_id=next_max_id) + _followers_target_1.extend(results.get('users', [])) + next_max_id = results.get('next_max_id') + sys.stdout.write("\rCatched %i followers" % len(_followers_target_1)) + sys.stdout.flush() + + print("\n") + + pc.printout("Insert target two username: ", pc.YELLOW) + line = input() + self.setTarget(line, False) + target_2 = self.target + if self.check_private_profile(): + return + + + pc.printout("Searching for " + self.target + " followers...\n") + + rank_token = AppClient.generate_uuid() + data = self.api.user_followers(str(self.target_id), rank_token=rank_token) + + _followers_target_2.extend(data.get('users', [])) + + next_max_id = data.get('next_max_id') + while next_max_id: + sys.stdout.write("\rCatched %i followers" % len(_followers_target_2)) + sys.stdout.flush() + results = self.api.user_followers(str(self.target_id), rank_token=rank_token, max_id=next_max_id) + _followers_target_2.extend(results.get('users', [])) + next_max_id = results.get('next_max_id') + sys.stdout.write("\rCatched %i followers" % len(_followers_target_2)) + sys.stdout.flush() + + print("\n") + + for user in _followers_target_1: + ff = list(filter(lambda x: x['pk'] == user['pk'], _followers_target_2)) + if(len(ff) > 0): + f = { + 'id': ff[0]['pk'], + 'username': ff[0]['username'], + 'full_name': ff[0]['full_name'] + } + followers_subset.append(f) + + t = PrettyTable(['ID', 'Username', 'Full Name']) + t.align["ID"] = "l" + t.align["Username"] = "l" + t.align["Full Name"] = "l" + + json_data = {} + followings_subset_list = [] + + for node in followers_subset: + t.add_row([str(node['id']), node['username'], node['full_name']]) + + if self.jsonDump: + follow = { + 'id': node['id'], + 'username': node['username'], + 'full_name': node['full_name'] + } + followings_subset_list.append(follow) + + if self.writeFile: + file_name = self.output_dir + "/" + target_1 + "-" + target_2 + "_followers.txt" + file = open(file_name, "w") + file.write(str(t)) + file.close() + + if self.jsonDump: + json_data['followers'] = followers_subset + json_file_name = self.output_dir + "/" + target_1 + "-" + target_2 + "_followers.txt" + with open(json_file_name, 'w') as f: + json.dump(json_data, f) + + print(t) + pc.printout("Founded " + str(len(followers_subset)) + " users!\n", pc.GREEN) + + def get_followings_subset(self): + if self.check_private_profile(): + return + + pc.printout("Searching for " + self.target + " followings...\n") + + target_1 = self.target + _followings_target_1 = [] + _followings_target_2 = [] + followers_subset = [] + + + rank_token = AppClient.generate_uuid() + data = self.api.user_followers(str(self.target_id), rank_token=rank_token) + + _followings_target_1.extend(data.get('users', [])) + + next_max_id = data.get('next_max_id') + while next_max_id: + sys.stdout.write("\rCatched %i followings" % len(_followings_target_1)) + sys.stdout.flush() + results = self.api.user_followers(str(self.target_id), rank_token=rank_token, max_id=next_max_id) + _followings_target_1.extend(results.get('users', [])) + next_max_id = results.get('next_max_id') + sys.stdout.write("\rCatched %i followings" % len(_followings_target_2)) + sys.stdout.flush() + + print("\n") + + pc.printout("Insert target two username: ", pc.YELLOW) + line = input() + self.setTarget(line, False) + target_2 = self.target + if self.check_private_profile(): + return + + + pc.printout("Searching for " + self.target + " followings...\n") + + rank_token = AppClient.generate_uuid() + data = self.api.user_followers(str(self.target_id), rank_token=rank_token) + + _followings_target_2.extend(data.get('users', [])) + + next_max_id = data.get('next_max_id') + while next_max_id: + sys.stdout.write("\rCatched %i followings" % len(_followings_target_2)) + sys.stdout.flush() + results = self.api.user_followers(str(self.target_id), rank_token=rank_token, max_id=next_max_id) + _followings_target_2.extend(results.get('users', [])) + next_max_id = results.get('next_max_id') + sys.stdout.write("\rCatched %i followings" % len(_followings_target_2)) + sys.stdout.flush() + + print("\n") + + for user in _followings_target_1: + ff = list(filter(lambda x: x['pk'] == user['pk'], _followings_target_2)) + if(len(ff) > 0): + f = { + 'id': ff[0]['pk'], + 'username': ff[0]['username'], + 'full_name': ff[0]['full_name'] + } + followers_subset.append(f) + + t = PrettyTable(['ID', 'Username', 'Full Name']) + t.align["ID"] = "l" + t.align["Username"] = "l" + t.align["Full Name"] = "l" + + json_data = {} + followings_subset_list = [] + + for node in followers_subset: + t.add_row([str(node['id']), node['username'], node['full_name']]) + + if self.jsonDump: + follow = { + 'id': node['id'], + 'username': node['username'], + 'full_name': node['full_name'] + } + followings_subset_list.append(follow) + + if self.writeFile: + file_name = self.output_dir + "/" + target_1 + "-" + target_2 + "_followings.txt" + file = open(file_name, "w") + file.write(str(t)) + file.close() + + if self.jsonDump: + json_data['followings'] = followers_subset + json_file_name = self.output_dir + "/" + target_1 + "-" + target_2 + "_followings.txt" + with open(json_file_name, 'w') as f: + json.dump(json_data, f) + + print(t) + pc.printout("Founded " + str(len(followers_subset)) + " users!\n", pc.GREEN) + + def clear_cache(self): try: f = open("config/settings.json",'w')