Skip to content

Call onNext from Main thread dont repect subscribeOn when flatMap or switchMap #3600

@danielgomezrico

Description

@danielgomezrico

Im doing a wrapper for the facebook sdk and trying to do it with rx java but Im getting an issue with the actual thread for the observables created in my wrapper. I want to get the user object after the user log in with fb.

public class LoginPresenter extends BasePresenter<LoginView> {

public void startFacebookLogin() {
    if (!mNetworkChecker.isNetworkConnectionAvailable()) {
      mView.showNoConnectionError();
      return;
    }

    Subscription subscription = mFacebookRepository.startLoginFlow()
        .flatMap(loginResult -> mFacebookRepository.getUser())
        .compose(Transformer.applyIoSchedulers())
        .subscribe(user -> mView.startSignUpView(user), this::manageErrorFlow);

    mCompositeSubscription.add(subscription);
  }
}
public class FacebookRepository {

  private WeakReference<Activity> mActivityWeakReference;
  @Nullable private CallbackManager callbackManager;

  public FacebookRepository(String facebookPublicKey, Activity activity) {
    if (!FacebookSdk.isInitialized()) {
      FacebookSdk.sdkInitialize(com.barista_v.wapa.api.Api.getContext());
    }

    if (com.barista_v.wapa.api.Api.getLogLevel() > com.barista_v.wapa.api.Api.LOG_LEVEL_BASIC) {
      FacebookSdk.addLoggingBehavior(LoggingBehavior.REQUESTS);
    }

    mActivityWeakReference = new WeakReference<>(activity);

    FacebookSdk.setApplicationId(facebookPublicKey);
  }

  public Observable<LoginResult> startLoginFlow() {
    return Observable.create(subscriber -> {
      Activity activity = mActivityWeakReference.get();
      if (activity != null) {
        LoginManager loginManager = LoginManager.getInstance();
        callbackManager = CallbackManager.Factory.create();

        loginManager.logInWithReadPermissions(activity, getPermissions());
        loginManager.registerCallback(callbackManager, new FacebookCallback<LoginResult>() {
          @Override
          public void onSuccess(LoginResult loginResult) {
            subscriber.onNext(loginResult);
            subscriber.onCompleted();
          }

          @Override
          public void onCancel() {
            subscriber.onError(
                new FacebookLoginCancelException("Login canceled.", "Facebook Error", null));
          }

          @Override
          public void onError(FacebookException exception) {
            subscriber.onError(new FacebookLoginException(exception));
          }
        });
      } else {
        Timber.w("The activity reference is null for FacebookRepository.startLoginFlow()");
        subscriber.onError(
            new FacebookLoginException("Something happened internally.", "Facebook Error", null));
      }
    });
  }

  public Observable<User> getUser() {
    return Observable.defer(() -> {
      AccessToken accessToken = AccessToken.getCurrentAccessToken();
      String userId = accessToken.getUserId();
      Bundle params = new Bundle();
      params.putString("fields", "name,email");
      String graphPath = "/" + userId;
      GraphRequest graphRequest = new GraphRequest(accessToken, graphPath, params, HttpMethod.GET);

      JSONObject json;
      try {
        GraphResponse result = graphRequest.executeAndWait();
        json = result.getJSONObject();
      } catch (FacebookException e) {
        return Observable.error(e);
      }

      String name, email;

      try {
        name = json.getString("name");
      } catch (JSONException | JsonSyntaxException | NullPointerException e) {
        return Observable.error(e);
      }

      try {
        email = json.getString("email");
      } catch (JSONException | JsonSyntaxException | NullPointerException e) {
        com.barista_v.wapa.api.error_handling.ApiException exception =
            new UserInfoMissingException(
                "Email access permission is not granted but it is required.",
                "Invalid Facebook Permissions", e);
        return Observable.error(exception);
      }

      String photoUrl =
          String.format("https://graph.facebook.com/%s/picture?type=large", userId);

      return Observable.just(new User(name, email, photoUrl));
    });
  }

  public void onActivityResult(int requestCode, int resultCode, Intent data) {
    if (callbackManager != null) {
      callbackManager.onActivityResult(requestCode, resultCode, data);
    }
  }
}

public class Transformer {
  public static <T> Observable.Transformer<T, T> applyIoSchedulers() {
    return observable -> observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
  }
}

The problem is that the code that execute the mFacebookRepository.getUser() is in the main thread.
Should not be the IO thread?

I create the observable and call onNext when FacebookCallback call onSuccess function and it is called in the main thread, so subscriber.onNext is called on the main thread.

Is this a bug or something Im doing wrong?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions